1use std::{
5 any::Any,
6 convert::Infallible,
7 net::{SocketAddr, TcpStream},
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use async_graphql::{
13 Context, Data, Schema, SchemaBuilder,
14 extensions::{ApolloTracing, ExtensionFactory, Tracing},
15 http::ALL_WEBSOCKET_PROTOCOLS,
16};
17use async_graphql_axum::{GraphQLProtocol, GraphQLRequest, GraphQLResponse, GraphQLWebSocket};
18use axum::{
19 Extension, Router,
20 body::Body,
21 extract::{
22 ConnectInfo, FromRef, FromRequestParts, Query as AxumQuery, State, ws::WebSocketUpgrade,
23 },
24 http::{HeaderMap, StatusCode},
25 middleware::{self},
26 response::{IntoResponse, Response as AxumResponse},
27 routing::{MethodRouter, Route, get, post},
28};
29use axum_extra::{TypedHeader, headers::ContentLength};
30use chrono::Utc;
31use http::{HeaderValue, Method, Request};
32use iota_graphql_rpc_headers::LIMITS_HEADER;
33use iota_indexer::{
34 apis::{OptimisticWriteApi, WriteApi},
35 db::{get_pool_connection, setup_postgres::check_db_migration_consistency},
36 metrics::IndexerMetrics,
37 optimistic_indexing::OptimisticTransactionExecutor,
38 read::IndexerReader,
39 store::PgIndexerStore,
40};
41use iota_json_rpc_api::CLIENT_SDK_TYPE_HEADER;
42use iota_metrics::spawn_monitored_task;
43use iota_network_stack::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler};
44use iota_package_resolver::{PackageStoreWithLruCache, Resolver};
45use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
46use tokio::{join, net::TcpListener, sync::OnceCell};
47use tokio_util::sync::CancellationToken;
48use tower::{Layer, Service};
49use tower_http::cors::{AllowOrigin, CorsLayer};
50use tracing::{info, warn};
51use uuid::Uuid;
52
53use crate::{
54 config::{
55 ConnectionConfig, MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
56 ServerConfig, ServiceConfig, Version,
57 },
58 context_data::db_data_provider::PgManager,
59 data::{
60 DataLoader, Db,
61 package_resolver::{DbPackageStore, PackageResolver},
62 },
63 error::Error,
64 extensions::{
65 directive_checker::DirectiveChecker,
66 feature_gate::FeatureGate,
67 logger::Logger,
68 query_limits_checker::{PayloadSize, QueryLimitsChecker, ShowUsage},
69 timeout::Timeout,
70 },
71 metrics::Metrics,
72 mutation::Mutation,
73 server::{
74 exchange_rates_task::TriggerExchangeRatesTask,
75 system_package_task::SystemPackageTask,
76 version::{check_version_middleware, set_version_middleware},
77 watermark_task::{Watermark, WatermarkLock, WatermarkTask},
78 },
79 types::{
80 chain_identifier::ChainIdentifierCache,
81 datatype::IMoveDatatype,
82 move_object::IMoveObject,
83 object::IObject,
84 owner::IOwner,
85 query::{IotaGraphQLSchema, Query},
86 subscription::{GraphQLStream, Subscription},
87 },
88};
89
90const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);
93
94pub(crate) struct Server {
95 router: Router,
96 address: SocketAddr,
97 watermark_task: WatermarkTask,
98 system_package_task: SystemPackageTask,
99 trigger_exchange_rates_task: TriggerExchangeRatesTask,
100 state: AppState,
101}
102
103impl Server {
104 pub async fn run(mut self) -> Result<(), Error> {
108 get_or_init_server_start_time().await;
109
110 let watermark_task = {
114 info!("Starting watermark update task");
115 spawn_monitored_task!(async move {
116 self.watermark_task.run().await;
117 })
118 };
119
120 let system_package_task = {
123 info!("Starting system package task");
124 spawn_monitored_task!(async move {
125 self.system_package_task.run().await;
126 })
127 };
128
129 let trigger_exchange_rates_task = {
130 info!("Starting trigger exchange rates task");
131 spawn_monitored_task!(async move {
132 self.trigger_exchange_rates_task.run().await;
133 })
134 };
135
136 let server_task = {
137 info!("Starting graphql service");
138 let cancellation_token = self.state.cancellation_token.clone();
139 let address = self.address;
140 let router = self.router;
141 spawn_monitored_task!(async move {
142 axum::serve(
143 TcpListener::bind(address)
144 .await
145 .map_err(|e| Error::Internal(format!("listener bind failed: {e}")))?,
146 router.into_make_service_with_connect_info::<SocketAddr>(),
147 )
148 .with_graceful_shutdown(async move {
149 cancellation_token.cancelled().await;
150 info!("Shutdown signal received, terminating graphql service");
151 })
152 .await
153 .map_err(|e| Error::Internal(format!("Server run failed: {e}")))
154 })
155 };
156
157 let _ = join!(
161 watermark_task,
162 system_package_task,
163 trigger_exchange_rates_task,
164 server_task
165 );
166
167 Ok(())
168 }
169}
170
171pub(crate) struct ServerBuilder {
172 state: AppState,
173 schema: SchemaBuilder<Query, Mutation, Subscription>,
174 router: Option<Router>,
175 db_reader: Option<Db>,
176 resolver: Option<PackageResolver>,
177}
178
179#[derive(Clone)]
180pub(crate) struct AppState {
181 connection: ConnectionConfig,
182 service: ServiceConfig,
183 metrics: Metrics,
184 cancellation_token: CancellationToken,
185 pub version: Version,
186}
187
188impl AppState {
189 pub(crate) fn new(
190 connection: ConnectionConfig,
191 service: ServiceConfig,
192 metrics: Metrics,
193 cancellation_token: CancellationToken,
194 version: Version,
195 ) -> Self {
196 Self {
197 connection,
198 service,
199 metrics,
200 cancellation_token,
201 version,
202 }
203 }
204}
205
206impl FromRef<AppState> for ConnectionConfig {
207 fn from_ref(app_state: &AppState) -> ConnectionConfig {
208 app_state.connection.clone()
209 }
210}
211
212impl FromRef<AppState> for Metrics {
213 fn from_ref(app_state: &AppState) -> Metrics {
214 app_state.metrics.clone()
215 }
216}
217
218impl ServerBuilder {
219 pub fn new(state: AppState) -> Self {
220 Self {
221 state,
222 schema: schema_builder(),
223 router: None,
224 db_reader: None,
225 resolver: None,
226 }
227 }
228
229 pub fn address(&self) -> String {
230 format!(
231 "{}:{}",
232 self.state.connection.host, self.state.connection.port
233 )
234 }
235
236 pub fn context_data(mut self, context_data: impl Any + Send + Sync) -> Self {
237 self.schema = self.schema.data(context_data);
238 self
239 }
240
241 pub fn extension(mut self, extension: impl ExtensionFactory) -> Self {
242 self.schema = self.schema.extension(extension);
243 self
244 }
245
246 fn build_schema(self) -> Schema<Query, Mutation, Subscription> {
247 self.schema.finish()
248 }
249
250 fn build_components(
253 self,
254 ) -> (
255 String,
256 Schema<Query, Mutation, Subscription>,
257 Db,
258 PackageResolver,
259 Router,
260 ) {
261 let address = self.address();
262 let ServerBuilder {
263 state: _,
264 schema,
265 db_reader,
266 resolver,
267 router,
268 } = self;
269 (
270 address,
271 schema.finish(),
272 db_reader.expect("db reader not initialized"),
273 resolver.expect("package resolver not initialized"),
274 router.expect("router not initialized"),
275 )
276 }
277
278 fn init_router(&mut self) {
279 if self.router.is_none() {
280 let router: Router = Router::new()
281 .route("/", post(graphql_handler))
282 .route("/subscriptions", get(subscription_handler))
283 .route("/{version}", post(graphql_handler))
284 .route("/{version}/subscriptions", get(subscription_handler))
285 .route("/graphql", post(graphql_handler))
286 .route("/graphql/subscriptions", get(subscription_handler))
287 .route("/graphql/{version}", post(graphql_handler))
288 .route(
289 "/graphql/{version}/subscriptions",
290 get(subscription_handler),
291 )
292 .route("/health", get(health_check))
293 .route("/graphql/health", get(health_check))
294 .route("/graphql/{version}/health", get(health_check))
295 .with_state(self.state.clone())
296 .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
297 metrics: self.state.metrics.clone(),
298 }));
299 self.router = Some(router);
300 }
301 }
302
303 pub fn route(mut self, path: &str, method_handler: MethodRouter) -> Self {
304 self.init_router();
305 self.router = self.router.map(|router| router.route(path, method_handler));
306 self
307 }
308
309 pub fn layer<L>(mut self, layer: L) -> Self
310 where
311 L: Layer<Route> + Clone + Send + Sync + 'static,
312 L::Service: Service<Request<Body>> + Clone + Send + Sync + 'static,
313 <L::Service as Service<Request<Body>>>::Response: IntoResponse + 'static,
314 <L::Service as Service<Request<Body>>>::Error: Into<Infallible> + 'static,
315 <L::Service as Service<Request<Body>>>::Future: Send + 'static,
316 {
317 self.init_router();
318 self.router = self.router.map(|router| router.layer(layer));
319 self
320 }
321
322 fn cors() -> Result<CorsLayer, Error> {
323 let acl = match std::env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
324 Ok(value) => {
325 let allow_hosts = value
326 .split(',')
327 .map(HeaderValue::from_str)
328 .collect::<Result<Vec<_>, _>>()
329 .map_err(|_| {
330 Error::Internal(
331 "Cannot resolve access control origin env variable".to_string(),
332 )
333 })?;
334 AllowOrigin::list(allow_hosts)
335 }
336 _ => AllowOrigin::any(),
337 };
338 info!("Access control allow origin set to: {acl:?}");
339
340 let cors = CorsLayer::new()
341 .allow_methods([Method::POST, Method::GET])
343 .allow_origin(acl)
345 .allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
346 Ok(cors)
347 }
348
349 pub fn build(self) -> Result<Server, Error> {
351 let state = self.state.clone();
352 let (address, schema, db_reader, resolver, router) = self.build_components();
353
354 let watermark_task = WatermarkTask::new(
356 db_reader.clone(),
357 state.metrics.clone(),
358 std::time::Duration::from_millis(state.service.background_tasks.watermark_update_ms),
359 state.cancellation_token.clone(),
360 );
361
362 let system_package_task = SystemPackageTask::new(
363 resolver,
364 watermark_task.epoch_receiver(),
365 state.cancellation_token.clone(),
366 );
367
368 let trigger_exchange_rates_task = TriggerExchangeRatesTask::new(
369 db_reader,
370 watermark_task.epoch_receiver(),
371 state.cancellation_token.clone(),
372 );
373
374 let router = router
375 .route_layer(middleware::from_fn_with_state(
376 state.version,
377 set_version_middleware,
378 ))
379 .route_layer(middleware::from_fn_with_state(
380 state.version,
381 check_version_middleware,
382 ))
383 .layer(axum::extract::Extension(schema))
384 .layer(axum::extract::Extension(watermark_task.lock()))
385 .layer(Self::cors()?);
386
387 Ok(Server {
388 router,
389 address: address
390 .parse()
391 .map_err(|_| Error::Internal(format!("Failed to parse address {address}")))?,
392 watermark_task,
393 system_package_task,
394 trigger_exchange_rates_task,
395 state,
396 })
397 }
398
399 pub async fn from_config(
402 config: &ServerConfig,
403 version: &Version,
404 cancellation_token: CancellationToken,
405 ) -> Result<Self, Error> {
406 let prom_addr: SocketAddr = format!(
408 "{}:{}",
409 config.connection.prom_host, config.connection.prom_port
410 )
411 .parse()
412 .map_err(|_| {
413 Error::Internal(format!(
414 "Failed to parse url {}, port {} into socket address",
415 config.connection.prom_host, config.connection.prom_port
416 ))
417 })?;
418
419 let registry_service = iota_metrics::start_prometheus_server(prom_addr);
420 info!("Starting Prometheus HTTP endpoint at {}", prom_addr);
421 let registry = registry_service.default_registry();
422 registry
423 .register(iota_metrics::uptime_metric(
424 "graphql",
425 version.full,
426 "unknown",
427 ))
428 .unwrap();
429
430 let metrics = Metrics::new(®istry);
432 let indexer_metrics = IndexerMetrics::new(®istry);
433 let state = AppState::new(
434 config.connection.clone(),
435 config.service.clone(),
436 metrics.clone(),
437 cancellation_token,
438 *version,
439 );
440 let mut builder = ServerBuilder::new(state);
441
442 let iota_names_config = config.service.iota_names.clone();
443 let zklogin_config = config.service.zklogin.clone();
444 let reader = PgManager::reader_with_config(
445 config.connection.db_url.clone(),
446 config.connection.db_pool_size,
447 config.service.limits.request_timeout_ms.into(),
451 )
452 .map_err(|e| Error::ServerInit(format!("Failed to create pg connection pool: {e}")))?;
453
454 if !config.connection.skip_migration_consistency_check {
455 info!("Starting compatibility check");
457 let mut connection = get_pool_connection(&reader.get_pool())?;
458 check_db_migration_consistency(&mut connection)?;
459 info!("Compatibility check passed");
460 }
461
462 let db = Db::new(
464 reader.clone(),
465 config.service.limits.clone(),
466 metrics.clone(),
467 );
468 let loader = DataLoader::new(db.clone());
469 let pg_conn_pool = PgManager::new(reader.clone());
470 let package_store = DbPackageStore::new(loader.clone());
471 let resolver = Arc::new(Resolver::new_with_limits(
472 PackageStoreWithLruCache::new(package_store),
473 config.service.limits.package_resolver_limits(),
474 ));
475
476 builder.db_reader = Some(db.clone());
477 builder.resolver = Some(resolver.clone());
478
479 let Some(fullnode_url) = config.tx_exec_full_node.node_rpc_url.as_ref() else {
480 return Err(Error::ServerInit(
481 "No fullnode url found in config".to_string(),
482 ));
483 };
484
485 let graphql_streams =
486 GraphQLStream::new(&config.connection.db_url, reader.clone(), ®istry).await?;
487 let write_api = build_write_api(fullnode_url, reader, indexer_metrics)?;
488
489 builder = builder
490 .context_data(config.service.clone())
491 .context_data(loader)
492 .context_data(db)
493 .context_data(pg_conn_pool)
494 .context_data(resolver)
495 .context_data(write_api)
496 .context_data(iota_names_config)
497 .context_data(zklogin_config)
498 .context_data(metrics.clone())
499 .context_data(config.clone())
500 .context_data(graphql_streams)
501 .context_data(ChainIdentifierCache::default());
502
503 if config.internal_features.feature_gate {
504 builder = builder.extension(FeatureGate);
505 }
506
507 if config.internal_features.logger {
508 builder = builder.extension(Logger::default());
509 }
510
511 if config.internal_features.query_limits_checker {
512 builder = builder.extension(QueryLimitsChecker);
513 }
514
515 if config.internal_features.directive_checker {
516 builder = builder.extension(DirectiveChecker);
517 }
518
519 if config.internal_features.query_timeout {
520 builder = builder.extension(Timeout);
521 }
522
523 if config.internal_features.tracing {
524 builder = builder.extension(Tracing);
525 }
526
527 if config.internal_features.apollo_tracing {
528 builder = builder.extension(ApolloTracing);
529 }
530
531 Ok(builder)
535 }
536}
537
538fn schema_builder() -> SchemaBuilder<Query, Mutation, Subscription> {
539 async_graphql::Schema::build(Query, Mutation, Subscription)
540 .register_output_type::<IMoveObject>()
541 .register_output_type::<IObject>()
542 .register_output_type::<IOwner>()
543 .register_output_type::<IMoveDatatype>()
544}
545
546pub fn export_schema() -> String {
548 schema_builder().finish().sdl()
549}
550
551async fn graphql_handler(
556 ConnectInfo(addr): ConnectInfo<SocketAddr>,
557 TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
558 schema: Extension<IotaGraphQLSchema>,
559 Extension(watermark_lock): Extension<WatermarkLock>,
560 headers: HeaderMap,
561 req: GraphQLRequest,
562) -> (axum::http::Extensions, GraphQLResponse) {
563 let mut req = req.into_inner();
564 req.data.insert(PayloadSize(content_length));
565 req.data.insert(Uuid::new_v4());
566 if headers.contains_key(ShowUsage::name()) {
567 req.data.insert(ShowUsage)
568 }
569
570 req.data.insert(addr);
574 req.data.insert(Watermark::new(watermark_lock).await);
575
576 let result = schema.execute(req).await;
577
578 let mut extensions = axum::http::Extensions::new();
581 if result.is_err() {
582 extensions.insert(GraphqlErrors(std::sync::Arc::new(result.errors.clone())));
583 };
584 (extensions, result.into())
585}
586
587pub(crate) fn get_write_api<'ctx>(
588 ctx: &'ctx Context<'_>,
589) -> Result<&'ctx OptimisticWriteApi, Error> {
590 ctx.data_opt()
591 .ok_or_else(|| Error::Internal("Unable to get node execution interface".to_string()))
592}
593
594fn build_write_api(
595 fullnode_url: &str,
596 reader: IndexerReader,
597 metrics: IndexerMetrics,
598) -> Result<OptimisticWriteApi, Error> {
599 let json_rpc_client = build_json_rpc_client(fullnode_url)?;
600 let indexer_store = PgIndexerStore::new(reader.get_pool(), metrics.clone());
601 let optimistic_tx_executor =
602 OptimisticTransactionExecutor::new(fullnode_url, reader.clone(), indexer_store, metrics);
603 Ok(OptimisticWriteApi::new(
604 WriteApi::new(json_rpc_client, reader),
605 optimistic_tx_executor,
606 ))
607}
608
609fn build_json_rpc_client(rpc_client_url: &str) -> Result<HttpClient, Error> {
610 let mut headers = HeaderMap::new();
611 headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("graphql"));
612
613 let builder = HttpClientBuilder::default()
614 .max_request_size(2 << 30)
615 .set_headers(headers.clone())
616 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
617 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS);
618
619 builder.build(rpc_client_url).map_err(|e| {
620 warn!("Failed to get new Http client with error: {e:?}");
621 Error::Internal(format!(
622 "Failed to initialize fullnode RPC client with error: {e:?}"
623 ))
624 })
625}
626async fn subscription_handler(
632 ConnectInfo(addr): ConnectInfo<SocketAddr>,
633 Extension(schema): Extension<IotaGraphQLSchema>,
634 req: http::Request<Body>,
635) -> AxumResponse {
636 let headers_contains_show_usage = req.headers().contains_key(ShowUsage::name());
637 let (mut parts, _body) = req.into_parts();
638
639 let protocol = match GraphQLProtocol::from_request_parts(&mut parts, &()).await {
641 Ok(protocol) => protocol,
642 Err(err) => return err.into_response(),
643 };
644
645 let upgrade = match WebSocketUpgrade::from_request_parts(&mut parts, &()).await {
647 Ok(upgrade) => upgrade,
648 Err(err) => return err.into_response(),
649 };
650
651 let resp = upgrade
652 .protocols(ALL_WEBSOCKET_PROTOCOLS)
653 .on_upgrade(move |stream| async move {
654 let mut connection_data = Data::default();
656 connection_data.insert(PayloadSize(0));
658 connection_data.insert(Uuid::new_v4());
659 if headers_contains_show_usage {
660 connection_data.insert(ShowUsage)
661 }
662 connection_data.insert(addr);
663
664 let connection =
665 GraphQLWebSocket::new(stream, schema, protocol).with_data(connection_data);
666 connection.serve().await;
667 });
668 resp
669}
670
671#[derive(Clone)]
672struct MetricsMakeCallbackHandler {
673 metrics: Metrics,
674}
675
676impl MakeCallbackHandler for MetricsMakeCallbackHandler {
677 type Handler = MetricsCallbackHandler;
678
679 fn make_handler(&self, _request: &http::request::Parts) -> Self::Handler {
680 let start = Instant::now();
681 let metrics = self.metrics.clone();
682
683 metrics.request_metrics.inflight_requests.inc();
684 metrics.inc_num_queries();
685
686 MetricsCallbackHandler { metrics, start }
687 }
688}
689
690struct MetricsCallbackHandler {
691 metrics: Metrics,
692 start: Instant,
693}
694
695impl ResponseHandler for MetricsCallbackHandler {
696 fn on_response(&mut self, response: &http::response::Parts) {
697 if let Some(errors) = response.extensions.get::<GraphqlErrors>() {
698 self.metrics.inc_errors(&errors.0);
699 }
700 }
701
702 fn on_error<E>(&mut self, _error: &E) {
703 }
708}
709
710impl Drop for MetricsCallbackHandler {
711 fn drop(&mut self) {
712 self.metrics.query_latency(self.start.elapsed());
713 self.metrics.request_metrics.inflight_requests.dec();
714 }
715}
716
717#[derive(Debug, Clone)]
718struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);
719
720async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
722 let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
723 return StatusCode::INTERNAL_SERVER_ERROR;
724 };
725
726 let Some(host) = url.host_str() else {
727 return StatusCode::INTERNAL_SERVER_ERROR;
728 };
729
730 let tcp_url = if let Some(port) = url.port() {
731 format!("{host}:{port}")
732 } else {
733 host.to_string()
734 };
735
736 if TcpStream::connect(tcp_url).is_err() {
737 StatusCode::INTERNAL_SERVER_ERROR
738 } else {
739 StatusCode::OK
740 }
741}
742
743#[derive(serde::Deserialize)]
744struct HealthParam {
745 max_checkpoint_lag_ms: Option<u64>,
746}
747
748async fn health_check(
754 State(connection): State<ConnectionConfig>,
755 Extension(watermark_lock): Extension<WatermarkLock>,
756 AxumQuery(query_params): AxumQuery<HealthParam>,
757) -> StatusCode {
758 let db_health_check = db_health_check(axum::extract::State(connection)).await;
759 if db_health_check != StatusCode::OK {
760 return db_health_check;
761 }
762
763 let max_checkpoint_lag_ms = query_params
764 .max_checkpoint_lag_ms
765 .map(Duration::from_millis)
766 .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);
767
768 let checkpoint_timestamp =
769 Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);
770
771 let now_millis = Utc::now().timestamp_millis();
772
773 let now: Duration = match u64::try_from(now_millis) {
775 Ok(val) => Duration::from_millis(val),
776 Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
777 };
778
779 if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
780 return StatusCode::GATEWAY_TIMEOUT;
781 }
782
783 db_health_check
784}
785
786async fn get_or_init_server_start_time() -> &'static Instant {
788 static ONCE: OnceCell<Instant> = OnceCell::const_new();
789 ONCE.get_or_init(|| async move { Instant::now() }).await
790}
791
792pub mod tests {
793 use std::{sync::Arc, time::Duration};
794
795 use async_graphql::{
796 Request, Response, Variables,
797 extensions::{Extension, ExtensionContext, NextExecute},
798 };
799 use iota_types::transaction::TransactionData;
800 use serde_json::json;
801 use uuid::Uuid;
802
803 use super::*;
804 use crate::{
805 config::{ConnectionConfig, Limits, ServiceConfig, Version},
806 context_data::db_data_provider::PgManager,
807 extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
808 test_infra::cluster::Cluster,
809 };
810
811 fn prep_schema(
815 connection_config: Option<ConnectionConfig>,
816 service_config: Option<ServiceConfig>,
817 ) -> ServerBuilder {
818 let connection_config = connection_config.unwrap_or_default();
819 let service_config = service_config.unwrap_or_default();
820
821 let reader = PgManager::reader_with_config(
822 connection_config.db_url.clone(),
823 connection_config.db_pool_size,
824 service_config.limits.request_timeout_ms.into(),
825 )
826 .expect("failed to create pg connection pool");
827
828 let version = Version::for_testing();
829 let metrics = metrics();
830 let db = Db::new(
831 reader.clone(),
832 service_config.limits.clone(),
833 metrics.clone(),
834 );
835 let loader = DataLoader::new(db.clone());
836 let pg_conn_pool = PgManager::new(reader);
837 let cancellation_token = CancellationToken::new();
838 let watermark = Watermark {
839 checkpoint: 1,
840 checkpoint_timestamp_ms: 1,
841 epoch: 0,
842 };
843 let state = AppState::new(
844 connection_config.clone(),
845 service_config.clone(),
846 metrics.clone(),
847 cancellation_token.clone(),
848 version,
849 );
850 ServerBuilder::new(state)
851 .context_data(db)
852 .context_data(loader)
853 .context_data(pg_conn_pool)
854 .context_data(service_config)
855 .context_data(query_id())
856 .context_data(ip_address())
857 .context_data(watermark)
858 .context_data(ChainIdentifierCache::default())
859 .context_data(metrics)
860 }
861
862 fn metrics() -> Metrics {
863 let binding_address: SocketAddr = "0.0.0.0:9185".parse().unwrap();
864 let registry = iota_metrics::start_prometheus_server(binding_address).default_registry();
865 Metrics::new(®istry)
866 }
867
868 fn ip_address() -> SocketAddr {
869 let binding_address: SocketAddr = "0.0.0.0:51515".parse().unwrap();
870 binding_address
871 }
872
873 fn query_id() -> Uuid {
874 Uuid::new_v4()
875 }
876
877 pub async fn test_timeout_impl(cluster: &Cluster) {
878 struct TimedExecuteExt {
879 pub min_req_delay: Duration,
880 }
881
882 impl ExtensionFactory for TimedExecuteExt {
883 fn create(&self) -> Arc<dyn Extension> {
884 Arc::new(TimedExecuteExt {
885 min_req_delay: self.min_req_delay,
886 })
887 }
888 }
889
890 #[async_trait::async_trait]
891 impl Extension for TimedExecuteExt {
892 async fn execute(
893 &self,
894 ctx: &ExtensionContext<'_>,
895 operation_name: Option<&str>,
896 next: NextExecute<'_>,
897 ) -> Response {
898 tokio::time::sleep(self.min_req_delay).await;
899 next.run(ctx, operation_name).await
900 }
901 }
902
903 async fn test_timeout(
904 delay: Duration,
905 timeout: Duration,
906 query: &str,
907 write_api: OptimisticWriteApi,
908 ) -> Response {
909 let mut cfg = ServiceConfig::default();
910 cfg.limits.request_timeout_ms = timeout.as_millis() as u32;
911 cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32;
912
913 let schema = prep_schema(None, Some(cfg))
914 .context_data(write_api)
915 .extension(Timeout)
916 .extension(TimedExecuteExt {
917 min_req_delay: delay,
918 })
919 .build_schema();
920
921 schema.execute(query).await
922 }
923
924 let wallet = &cluster.validator_fullnode_handle.wallet;
925 let store = &cluster.indexer_store;
926 let fn_rpc_url = &cluster.validator_fullnode_handle.fullnode_handle.rpc_url;
927 let indexer_metrics = store.get_metrics();
928 let indexer_reader = iota_indexer::read::IndexerReader::new(store.blocking_cp());
929 let json_rpc_client = build_json_rpc_client(fn_rpc_url).unwrap();
930
931 let optimistic_tx_executor =
932 iota_indexer::optimistic_indexing::OptimisticTransactionExecutor::new(
933 fn_rpc_url,
934 indexer_reader.clone(),
935 store.clone(),
936 indexer_metrics,
937 );
938 let write_api = OptimisticWriteApi::new(
939 WriteApi::new(json_rpc_client, indexer_reader),
940 optimistic_tx_executor,
941 );
942
943 let query = "{ chainIdentifier }";
944 let timeout = Duration::from_millis(1000);
945 let delay = Duration::from_millis(100);
946
947 test_timeout(delay, timeout, query, write_api.clone())
948 .await
949 .into_result()
950 .expect("should complete successfully");
951
952 let errs: Vec<_> = test_timeout(delay, delay, query, write_api.clone())
954 .await
955 .into_result()
956 .unwrap_err()
957 .into_iter()
958 .map(|e| e.message)
959 .collect();
960 let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
961 assert_eq!(errs, vec![exp]);
962
963 let addresses = wallet.get_addresses();
967 let gas = wallet
968 .get_one_gas_object_owned_by_address(addresses[0])
969 .await
970 .unwrap();
971 let tx_data = TransactionData::new_transfer_iota(
972 addresses[1],
973 addresses[0],
974 Some(1000),
975 gas.unwrap(),
976 1_000_000,
977 wallet.get_reference_gas_price().await.unwrap(),
978 );
979
980 let tx = wallet.sign_transaction(&tx_data);
981 let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
982
983 let signature_base64 = &signatures[0];
984 let query = format!(
985 r#"
986 mutation {{
987 executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
988 effects {{
989 status
990 }}
991 }}
992 }}"#,
993 tx_bytes.encoded(),
994 signature_base64.encoded()
995 );
996 let errs: Vec<_> = test_timeout(delay, delay, &query, write_api.clone())
997 .await
998 .into_result()
999 .unwrap_err()
1000 .into_iter()
1001 .map(|e| e.message)
1002 .collect();
1003 let exp = format!(
1004 "Mutation request timed out. Limit: {}s",
1005 delay.as_secs_f32()
1006 );
1007 assert_eq!(errs, vec![exp]);
1008 }
1009
1010 pub async fn test_query_depth_limit_impl() {
1011 async fn exec_query_depth_limit(depth: u32, query: &str) -> Response {
1012 let service_config = ServiceConfig {
1013 limits: Limits {
1014 max_query_depth: depth,
1015 ..Default::default()
1016 },
1017 ..Default::default()
1018 };
1019
1020 let schema = prep_schema(None, Some(service_config))
1021 .context_data(PayloadSize(100))
1022 .extension(QueryLimitsChecker)
1023 .build_schema();
1024 schema.execute(query).await
1025 }
1026
1027 exec_query_depth_limit(1, "{ chainIdentifier }")
1028 .await
1029 .into_result()
1030 .expect("should complete successfully");
1031
1032 exec_query_depth_limit(
1033 5,
1034 "{ chainIdentifier protocolConfig { configs { value key }} }",
1035 )
1036 .await
1037 .into_result()
1038 .expect("should complete successfully");
1039
1040 let errs: Vec<_> = exec_query_depth_limit(0, "{ chainIdentifier }")
1042 .await
1043 .into_result()
1044 .unwrap_err()
1045 .into_iter()
1046 .map(|e| e.message)
1047 .collect();
1048
1049 assert_eq!(errs, vec!["Query nesting is over 0".to_string()]);
1050 let errs: Vec<_> = exec_query_depth_limit(
1051 2,
1052 "{ chainIdentifier protocolConfig { configs { value key }} }",
1053 )
1054 .await
1055 .into_result()
1056 .unwrap_err()
1057 .into_iter()
1058 .map(|e| e.message)
1059 .collect();
1060 assert_eq!(errs, vec!["Query nesting is over 2".to_string()]);
1061 }
1062
1063 pub async fn test_query_node_limit_impl() {
1064 async fn exec_query_node_limit(nodes: u32, query: &str) -> Response {
1065 let service_config = ServiceConfig {
1066 limits: Limits {
1067 max_query_nodes: nodes,
1068 ..Default::default()
1069 },
1070 ..Default::default()
1071 };
1072
1073 let schema = prep_schema(None, Some(service_config))
1074 .context_data(PayloadSize(100))
1075 .extension(QueryLimitsChecker)
1076 .build_schema();
1077 schema.execute(query).await
1078 }
1079
1080 exec_query_node_limit(1, "{ chainIdentifier }")
1081 .await
1082 .into_result()
1083 .expect("should complete successfully");
1084
1085 exec_query_node_limit(
1086 5,
1087 "{ chainIdentifier protocolConfig { configs { value key }} }",
1088 )
1089 .await
1090 .into_result()
1091 .expect("should complete successfully");
1092
1093 let err: Vec<_> = exec_query_node_limit(0, "{ chainIdentifier }")
1095 .await
1096 .into_result()
1097 .unwrap_err()
1098 .into_iter()
1099 .map(|e| e.message)
1100 .collect();
1101 assert_eq!(err, vec!["Query has over 0 nodes".to_string()]);
1102
1103 let err: Vec<_> = exec_query_node_limit(
1104 4,
1105 "{ chainIdentifier protocolConfig { configs { value key }} }",
1106 )
1107 .await
1108 .into_result()
1109 .unwrap_err()
1110 .into_iter()
1111 .map(|e| e.message)
1112 .collect();
1113 assert_eq!(err, vec!["Query has over 4 nodes".to_string()]);
1114 }
1115
1116 pub async fn test_query_default_page_limit_impl(connection_config: ConnectionConfig) {
1117 let service_config = ServiceConfig {
1118 limits: Limits {
1119 default_page_size: 1,
1120 ..Default::default()
1121 },
1122 ..Default::default()
1123 };
1124 let schema = prep_schema(Some(connection_config), Some(service_config)).build_schema();
1125
1126 let resp = schema
1127 .execute("{ checkpoints { nodes { sequenceNumber } } }")
1128 .await;
1129 let data = resp.data.clone().into_json().unwrap();
1130 let checkpoints = data
1131 .get("checkpoints")
1132 .unwrap()
1133 .get("nodes")
1134 .unwrap()
1135 .as_array()
1136 .unwrap();
1137 assert_eq!(
1138 checkpoints.len(),
1139 1,
1140 "Checkpoints should have exactly one element"
1141 );
1142
1143 let resp = schema
1144 .execute("{ checkpoints(first: 2) { nodes { sequenceNumber } } }")
1145 .await;
1146 let data = resp.data.clone().into_json().unwrap();
1147 let checkpoints = data
1148 .get("checkpoints")
1149 .unwrap()
1150 .get("nodes")
1151 .unwrap()
1152 .as_array()
1153 .unwrap();
1154 assert_eq!(
1155 checkpoints.len(),
1156 2,
1157 "Checkpoints should return two elements"
1158 );
1159 }
1160
1161 pub async fn test_query_max_page_limit_impl() {
1162 let schema = prep_schema(None, None).build_schema();
1163
1164 schema
1165 .execute("{ objects(first: 1) { nodes { version } } }")
1166 .await
1167 .into_result()
1168 .expect("should complete successfully");
1169
1170 let err: Vec<_> = schema
1172 .execute("{ objects(first: 51) { nodes { version } } }")
1173 .await
1174 .into_result()
1175 .unwrap_err()
1176 .into_iter()
1177 .map(|e| e.message)
1178 .collect();
1179 assert_eq!(
1180 err,
1181 vec!["Connection's page size of 51 exceeds max of 50".to_string()]
1182 );
1183 }
1184
1185 pub async fn test_query_complexity_metrics_impl() {
1186 let server_builder = prep_schema(None, None).context_data(PayloadSize(100));
1187 let metrics = server_builder.state.metrics.clone();
1188 let schema = server_builder
1189 .extension(QueryLimitsChecker) .build_schema();
1191
1192 schema
1193 .execute("{ chainIdentifier }")
1194 .await
1195 .into_result()
1196 .expect("should complete successfully");
1197
1198 let req_metrics = metrics.request_metrics;
1199 assert_eq!(req_metrics.input_nodes.get_sample_count(), 1);
1200 assert_eq!(req_metrics.output_nodes.get_sample_count(), 1);
1201 assert_eq!(req_metrics.query_depth.get_sample_count(), 1);
1202 assert_eq!(req_metrics.input_nodes.get_sample_sum(), 1.);
1203 assert_eq!(req_metrics.output_nodes.get_sample_sum(), 1.);
1204 assert_eq!(req_metrics.query_depth.get_sample_sum(), 1.);
1205
1206 schema
1207 .execute("{ chainIdentifier protocolConfig { configs { value key }} }")
1208 .await
1209 .into_result()
1210 .expect("should complete successfully");
1211
1212 assert_eq!(req_metrics.input_nodes.get_sample_count(), 2);
1213 assert_eq!(req_metrics.output_nodes.get_sample_count(), 2);
1214 assert_eq!(req_metrics.query_depth.get_sample_count(), 2);
1215 assert_eq!(req_metrics.input_nodes.get_sample_sum(), 2. + 4.);
1216 assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
1217 assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
1218 }
1219
1220 pub async fn test_health_check_impl() {
1221 let server_builder = prep_schema(None, None);
1222 let url = format!(
1223 "http://{}:{}/health",
1224 server_builder.state.connection.host, server_builder.state.connection.port
1225 );
1226 server_builder.build_schema();
1227
1228 let resp = reqwest::get(&url).await.unwrap();
1229 assert_eq!(resp.status(), reqwest::StatusCode::OK);
1230
1231 let url_with_param = format!("{url}?max_checkpoint_lag_ms=1");
1232 let resp = reqwest::get(&url_with_param).await.unwrap();
1233 assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT);
1234 }
1235
1236 async fn execute_for_error(limits: Limits, request: Request) -> String {
1239 let service_config = ServiceConfig {
1240 limits,
1241 ..Default::default()
1242 };
1243
1244 let schema = prep_schema(None, Some(service_config))
1245 .context_data(PayloadSize(
1246 serde_json::to_string(&request).unwrap().len() as u64,
1251 ))
1252 .extension(QueryLimitsChecker)
1253 .build_schema();
1254
1255 let errs: Vec<_> = schema
1256 .execute(request)
1257 .await
1258 .into_result()
1259 .unwrap_err()
1260 .into_iter()
1261 .map(|e| e.message)
1262 .collect();
1263
1264 errs.join("\n")
1265 }
1266
1267 pub async fn test_payload_read_exceeded_impl() {
1268 assert_eq!(
1269 execute_for_error(
1270 Limits {
1271 max_tx_payload_size: 400,
1272 max_query_payload_size: 10,
1273 ..Default::default()
1274 },
1275 r#"
1276 mutation {
1277 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1278 effects {
1279 status
1280 }
1281 }
1282 }
1283 "#
1284 .into()
1285 )
1286 .await,
1287 "Query part too large: 354 bytes. Requests are limited to 400 bytes or fewer on \
1288 transaction payloads (all inputs to executeTransactionBlock or \
1289 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 \
1290 bytes or fewer."
1291 );
1292 }
1293
1294 pub async fn test_payload_mutation_exceeded_impl() {
1295 assert_eq!(
1296 execute_for_error(
1297 Limits {
1298 max_tx_payload_size: 10,
1299 max_query_payload_size: 400,
1300 ..Default::default()
1301 },
1302 r#"
1303 mutation {
1304 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1305 effects {
1306 status
1307 }
1308 }
1309 }
1310 "#
1311 .into()
1312 )
1313 .await,
1314 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1315 transaction payloads (all inputs to executeTransactionBlock or \
1316 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1317 bytes or fewer."
1318 );
1319 }
1320
1321 pub async fn test_payload_dry_run_exceeded_impl() {
1322 assert_eq!(
1323 execute_for_error(
1324 Limits {
1325 max_tx_payload_size: 10,
1326 max_query_payload_size: 400,
1327 ..Default::default()
1328 },
1329 r#"
1330 query {
1331 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1332 error
1333 transaction {
1334 digest
1335 }
1336 }
1337 }
1338 "#
1339 .into(),
1340 )
1341 .await,
1342 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1343 transaction payloads (all inputs to executeTransactionBlock or \
1344 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 bytes \
1345 or fewer."
1346 );
1347 }
1348
1349 pub async fn test_payload_total_exceeded_impl() {
1350 assert_eq!(
1351 execute_for_error(
1352 Limits {
1353 max_tx_payload_size: 10,
1354 max_query_payload_size: 10,
1355 ..Default::default()
1356 },
1357 r#"
1358 query {
1359 dryRunTransactionBlock(txByte: "AAABBB") {
1360 error
1361 transaction {
1362 digest
1363 }
1364 }
1365 }
1366 "#
1367 .into(),
1368 )
1369 .await,
1370 "Overall request too large: 380 bytes. Requests are limited to 10 bytes or fewer on \
1371 transaction payloads (all inputs to executeTransactionBlock or dryRunTransactionBlock) \
1372 and the rest of the request (the query part) must be 10 bytes or fewer."
1373 );
1374 }
1375
1376 pub async fn test_payload_using_vars_mutation_exceeded_impl() {
1377 assert_eq!(
1378 execute_for_error(
1379 Limits {
1380 max_tx_payload_size: 10,
1381 max_query_payload_size: 500,
1382 ..Default::default()
1383 },
1384 Request::new(
1385 r#"
1386 mutation ($tx: String!, $sigs: [String!]!) {
1387 executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1388 effects {
1389 status
1390 }
1391 }
1392 }
1393 "#
1394 )
1395 .variables(Variables::from_json(json!({
1396 "tx": "AAABBBCCC",
1397 "sigs": ["BBB"]
1398 })))
1399 )
1400 .await,
1401 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1402 transaction payloads (all inputs to executeTransactionBlock or \
1403 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1404 bytes or fewer."
1405 );
1406 }
1407
1408 pub async fn test_payload_using_vars_read_exceeded_impl() {
1409 assert_eq!(
1410 execute_for_error(
1411 Limits {
1412 max_tx_payload_size: 500,
1413 max_query_payload_size: 10,
1414 ..Default::default()
1415 },
1416 Request::new(
1417 r#"
1418 mutation ($tx: String!, $sigs: [String!]!) {
1419 executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1420 effects {
1421 status
1422 }
1423 }
1424 }
1425 "#
1426 )
1427 .variables(Variables::from_json(json!({
1428 "tx": "AAA",
1429 "sigs": ["BBB"]
1430 })))
1431 )
1432 .await,
1433 "Query part too large: 409 bytes. Requests are limited to 500 bytes or fewer on \
1434 transaction payloads (all inputs to executeTransactionBlock or \
1435 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1436 or fewer."
1437 );
1438 }
1439
1440 pub async fn test_payload_using_vars_dry_run_exceeded_impl() {
1441 assert_eq!(
1442 execute_for_error(
1443 Limits {
1444 max_tx_payload_size: 10,
1445 max_query_payload_size: 400,
1446 ..Default::default()
1447 },
1448 Request::new(
1449 r#"
1450 query ($tx: String!) {
1451 dryRunTransactionBlock(txBytes: $tx) {
1452 error
1453 transaction {
1454 digest
1455 }
1456 }
1457 }
1458 "#
1459 )
1460 .variables(Variables::from_json(json!({
1461 "tx": "AAABBBCCC"
1462 }))),
1463 )
1464 .await,
1465 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1466 transaction payloads (all inputs to executeTransactionBlock or \
1467 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1468 bytes or fewer."
1469 );
1470 }
1471
1472 pub async fn test_payload_using_vars_dry_run_read_exceeded_impl() {
1473 assert_eq!(
1474 execute_for_error(
1475 Limits {
1476 max_tx_payload_size: 400,
1477 max_query_payload_size: 10,
1478 ..Default::default()
1479 },
1480 Request::new(
1481 r#"
1482 query ($tx: String!) {
1483 dryRunTransactionBlock(txBytes: $tx) {
1484 error
1485 transaction {
1486 digest
1487 }
1488 }
1489 }
1490 "#
1491 )
1492 .variables(Variables::from_json(json!({
1493 "tx": "AAABBBCCC"
1494 }))),
1495 )
1496 .await,
1497 "Query part too large: 398 bytes. Requests are limited to 400 bytes or fewer on \
1498 transaction payloads (all inputs to executeTransactionBlock or \
1499 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1500 or fewer."
1501 );
1502 }
1503
1504 pub async fn test_payload_multiple_execution_exceeded_impl() {
1505 let err = execute_for_error(
1508 Limits {
1509 max_tx_payload_size: 30,
1510 max_query_payload_size: 320,
1511 ..Default::default()
1512 },
1513 r#"
1514 mutation {
1515 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1516 effects {
1517 status
1518 }
1519 }
1520 }
1521 "#
1522 .into(),
1523 )
1524 .await;
1525 assert!(err.starts_with("Query part too large"), "{err}");
1526
1527 assert_eq!(
1528 execute_for_error(
1529 Limits {
1530 max_tx_payload_size: 30,
1531 max_query_payload_size: 800,
1532 ..Default::default()
1533 },
1534 r#"
1535 mutation {
1536 e0: executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1537 effects {
1538 status
1539 }
1540 }
1541 e1: executeTransactionBlock(txBytes: "EEEFFFGGG", signatures: ["HHH"]) {
1542 effects {
1543 status
1544 }
1545 }
1546 }
1547 "#
1548 .into()
1549 )
1550 .await,
1551 "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1552 transaction payloads (all inputs to executeTransactionBlock or \
1553 dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1554 bytes or fewer."
1555 );
1556 }
1557
1558 pub async fn test_payload_multiple_dry_run_exceeded_impl() {
1559 let err = execute_for_error(
1562 Limits {
1563 max_tx_payload_size: 20,
1564 max_query_payload_size: 330,
1565 ..Default::default()
1566 },
1567 r#"
1568 query {
1569 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1570 error
1571 transaction {
1572 digest
1573 }
1574 }
1575 }
1576 "#
1577 .into(),
1578 )
1579 .await;
1580 assert!(err.starts_with("Query part too large"), "{err}");
1581
1582 assert_eq!(
1583 execute_for_error(
1584 Limits {
1585 max_tx_payload_size: 20,
1586 max_query_payload_size: 800,
1587 ..Default::default()
1588 },
1589 r#"
1590 query {
1591 d0: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1592 error
1593 transaction {
1594 digest
1595 }
1596 }
1597 d1: dryRunTransactionBlock(txBytes: "DDDEEEFFF") {
1598 error
1599 transaction {
1600 digest
1601 }
1602 }
1603 }
1604 "#
1605 .into()
1606 )
1607 .await,
1608 "Transaction payload too large. Requests are limited to 20 bytes or fewer on \
1609 transaction payloads (all inputs to executeTransactionBlock or \
1610 dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1611 bytes or fewer."
1612 );
1613 }
1614
1615 pub async fn test_payload_execution_multiple_sigs_exceeded_impl() {
1616 let err = execute_for_error(
1619 Limits {
1620 max_tx_payload_size: 30,
1621 max_query_payload_size: 320,
1622 ..Default::default()
1623 },
1624 r#"
1625 mutation {
1626 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1627 effects {
1628 status
1629 }
1630 }
1631 }
1632 "#
1633 .into(),
1634 )
1635 .await;
1636
1637 assert!(err.starts_with("Query part too large"), "{err}");
1638
1639 assert_eq!(
1640 execute_for_error(
1641 Limits {
1642 max_tx_payload_size: 30,
1643 max_query_payload_size: 500,
1644 ..Default::default()
1645 },
1646 r#"
1647 mutation {
1648 executeTransactionBlock(
1649 txBytes: "AAA",
1650 signatures: ["BBB", "CCC", "DDD", "EEE", "FFF"]
1651 ) {
1652 effects {
1653 status
1654 }
1655 }
1656 }
1657 "#
1658 .into(),
1659 )
1660 .await,
1661 "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1662 transaction payloads (all inputs to executeTransactionBlock or \
1663 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1664 bytes or fewer.",
1665 )
1666 }
1667
1668 pub async fn test_payload_sig_var_execution_exceeded_impl() {
1669 assert_eq!(
1672 execute_for_error(
1673 Limits {
1674 max_tx_payload_size: 10,
1675 max_query_payload_size: 500,
1676 ..Default::default()
1677 },
1678 Request::new(
1679 r#"
1680 mutation ($tx: String!, $sig: String!) {
1681 executeTransactionBlock(txBytes: $tx, signatures: [$sig]) {
1682 effects {
1683 status
1684 }
1685 }
1686 }
1687 "#
1688 )
1689 .variables(Variables::from_json(json!({
1690 "tx": "AAA",
1691 "sig": "BBB"
1692 })))
1693 )
1694 .await,
1695 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1696 transaction payloads (all inputs to executeTransactionBlock or \
1697 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1698 bytes or fewer."
1699 );
1700 }
1701
1702 fn passed_tx_checks(err: &str) -> bool {
1705 !err.starts_with("Overall request too large")
1706 && !err.starts_with("Transaction payload too large")
1707 }
1708
1709 pub async fn test_payload_reusing_vars_execution_impl() {
1710 assert!(!passed_tx_checks(
1716 &execute_for_error(
1717 Limits {
1718 max_tx_payload_size: 1,
1719 max_query_payload_size: 1,
1720 ..Default::default()
1721 },
1722 r#"
1723 mutation {
1724 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1725 effects {
1726 status
1727 }
1728 }
1729 }
1730 "#
1731 .into()
1732 )
1733 .await
1734 ));
1735
1736 let limits = Limits {
1737 max_tx_payload_size: 20,
1738 max_query_payload_size: 1000,
1739 ..Default::default()
1740 };
1741
1742 assert!(passed_tx_checks(
1745 &execute_for_error(
1746 limits.clone(),
1747 Request::new(
1748 r#"
1749 mutation ($sig: String!) {
1750 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig]) {
1751 effects {
1752 status
1753 }
1754 }
1755 }
1756 "#,
1757 )
1758 .variables(Variables::from_json(json!({
1759 "sig": "BBB"
1760 })))
1761 )
1762 .await
1763 ));
1764
1765 let execution_result = execute_for_error(
1768 limits.clone(),
1769 Request::new(
1770 r#"
1771 mutation ($sig: String!) {
1772 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, "BBB"]) {
1773 effects {
1774 status
1775 }
1776 }
1777 }
1778 "#,
1779 )
1780 .variables(Variables::from_json(json!({
1781 "sig": "BBB"
1782 }))),
1783 )
1784 .await;
1785 assert!(!passed_tx_checks(&execution_result), "{execution_result}");
1786
1787 let execution_result = execute_for_error(
1790 limits,
1791 Request::new(
1792 r#"
1793 mutation ($sig: String!) {
1794 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, $sig]) {
1795 effects {
1796 status
1797 }
1798 }
1799 }
1800 "#,
1801 )
1802 .variables(Variables::from_json(json!({
1803 "sig": "BBB"
1804 }))),
1805 )
1806 .await;
1807 assert!(passed_tx_checks(&execution_result), "{execution_result}");
1808 }
1809
1810 pub async fn test_payload_reusing_vars_dry_run_impl() {
1811 let limits = Limits {
1815 max_tx_payload_size: 20,
1816 max_query_payload_size: 1000,
1817 ..Default::default()
1818 };
1819
1820 assert!(passed_tx_checks(
1822 &execute_for_error(
1823 limits.clone(),
1824 Request::new(
1825 r#"
1826 query ($tx: String!) {
1827 dryRunTransactionBlock(txBytes: $tx) {
1828 error
1829 transaction {
1830 digest
1831 }
1832 }
1833 }
1834 "#,
1835 )
1836 .variables(Variables::from_json(json!({
1837 "tx": "AAABBBCCC"
1838 })))
1839 )
1840 .await
1841 ));
1842
1843 assert!(!passed_tx_checks(
1845 &execute_for_error(
1846 limits.clone(),
1847 Request::new(
1848 r#"
1849 query ($tx: String!) {
1850 d0: dryRunTransactionBlock(txBytes: $tx) {
1851 error
1852 transaction {
1853 digest
1854 }
1855 }
1856
1857 d1: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1858 error
1859 transaction {
1860 digest
1861 }
1862 }
1863 }
1864 "#,
1865 )
1866 .variables(Variables::from_json(json!({
1867 "tx": "AAABBBCCC"
1868 })))
1869 )
1870 .await
1871 ));
1872
1873 assert!(passed_tx_checks(
1875 &execute_for_error(
1876 limits,
1877 Request::new(
1878 r#"
1879 query ($tx: String!) {
1880 d0: dryRunTransactionBlock(txBytes: $tx) {
1881 error
1882 transaction {
1883 digest
1884 }
1885 }
1886
1887 d1: dryRunTransactionBlock(txBytes: $tx) {
1888 error
1889 transaction {
1890 digest
1891 }
1892 }
1893 }
1894 "#,
1895 )
1896 .variables(Variables::from_json(json!({
1897 "tx": "AAABBBCCC"
1898 })))
1899 )
1900 .await
1901 ));
1902 }
1903
1904 pub async fn test_payload_named_fragment_execution_exceeded_impl() {
1905 assert_eq!(
1906 execute_for_error(
1907 Limits {
1908 max_tx_payload_size: 10,
1909 max_query_payload_size: 500,
1910 ..Default::default()
1911 },
1912 r#"
1913 mutation {
1914 ...Tx
1915 }
1916
1917 fragment Tx on Mutation {
1918 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1919 effects {
1920 status
1921 }
1922 }
1923 }
1924 "#
1925 .into()
1926 )
1927 .await,
1928 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1929 transaction payloads (all inputs to executeTransactionBlock or \
1930 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1931 bytes or fewer."
1932 );
1933 }
1934
1935 pub async fn test_payload_inline_fragment_execution_exceeded_impl() {
1936 assert_eq!(
1937 execute_for_error(
1938 Limits {
1939 max_tx_payload_size: 10,
1940 max_query_payload_size: 500,
1941 ..Default::default()
1942 },
1943 r#"
1944 mutation {
1945 ... on Mutation {
1946 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1947 effects {
1948 status
1949 }
1950 }
1951 }
1952 }
1953 "#
1954 .into()
1955 )
1956 .await,
1957 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1958 transaction payloads (all inputs to executeTransactionBlock or \
1959 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1960 bytes or fewer."
1961 );
1962 }
1963
1964 pub async fn test_payload_named_fragment_dry_run_exceeded_impl() {
1965 assert_eq!(
1966 execute_for_error(
1967 Limits {
1968 max_tx_payload_size: 10,
1969 max_query_payload_size: 500,
1970 ..Default::default()
1971 },
1972 r#"
1973 query {
1974 ...DryRun
1975 }
1976
1977 fragment DryRun on Query {
1978 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1979 error
1980 transaction {
1981 digest
1982 }
1983 }
1984 }
1985 "#
1986 .into(),
1987 )
1988 .await,
1989 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1990 transaction payloads (all inputs to executeTransactionBlock or \
1991 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1992 bytes or fewer."
1993 );
1994 }
1995
1996 pub async fn test_payload_inline_fragment_dry_run_exceeded_impl() {
1997 assert_eq!(
1998 execute_for_error(
1999 Limits {
2000 max_tx_payload_size: 10,
2001 max_query_payload_size: 500,
2002 ..Default::default()
2003 },
2004 r#"
2005 query {
2006 ... on Query {
2007 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2008 error
2009 transaction {
2010 digest
2011 }
2012 }
2013 }
2014 }
2015 "#
2016 .into(),
2017 )
2018 .await,
2019 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2020 transaction payloads (all inputs to executeTransactionBlock or \
2021 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
2022 bytes or fewer."
2023 );
2024 }
2025}