1use std::{
5 any::Any,
6 convert::Infallible,
7 net::{SocketAddr, TcpStream},
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use async_graphql::{
13 Context, Data, Schema, SchemaBuilder,
14 extensions::{ApolloTracing, ExtensionFactory, Tracing},
15 http::ALL_WEBSOCKET_PROTOCOLS,
16};
17use async_graphql_axum::{GraphQLProtocol, GraphQLRequest, GraphQLResponse, GraphQLWebSocket};
18use axum::{
19 Extension, Router,
20 body::Body,
21 extract::{
22 ConnectInfo, FromRef, FromRequestParts, Query as AxumQuery, State, ws::WebSocketUpgrade,
23 },
24 http::{HeaderMap, StatusCode},
25 middleware::{self},
26 response::{IntoResponse, Response as AxumResponse},
27 routing::{MethodRouter, Route, get, post},
28};
29use axum_extra::{TypedHeader, headers::ContentLength};
30use chrono::Utc;
31use http::{HeaderValue, Method, Request};
32use iota_graphql_rpc_headers::LIMITS_HEADER;
33use iota_indexer::{
34 apis::{OptimisticWriteApi, WriteApi},
35 db::{get_pool_connection, setup_postgres::check_db_migration_consistency},
36 metrics::IndexerMetrics,
37 optimistic_indexing::OptimisticTransactionExecutor,
38 read::IndexerReader,
39 store::PgIndexerStore,
40};
41use iota_json_rpc_api::CLIENT_SDK_TYPE_HEADER;
42use iota_metrics::spawn_monitored_task;
43use iota_network_stack::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler};
44use iota_package_resolver::{PackageStoreWithLruCache, Resolver};
45use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
46use tokio::{join, net::TcpListener, sync::OnceCell};
47use tokio_util::sync::CancellationToken;
48use tower::{Layer, Service};
49use tower_http::cors::{AllowOrigin, CorsLayer};
50use tracing::{info, warn};
51use uuid::Uuid;
52
53use crate::{
54 config::{
55 ConnectionConfig, MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
56 ServerConfig, ServiceConfig, Version,
57 },
58 context_data::db_data_provider::PgManager,
59 data::{
60 DataLoader, Db,
61 package_resolver::{DbPackageStore, PackageResolver},
62 },
63 error::Error,
64 extensions::{
65 directive_checker::DirectiveChecker,
66 feature_gate::FeatureGate,
67 logger::Logger,
68 query_limits_checker::{PayloadSize, QueryLimitsChecker, ShowUsage},
69 timeout::Timeout,
70 },
71 metrics::Metrics,
72 mutation::Mutation,
73 server::{
74 exchange_rates_task::TriggerExchangeRatesTask,
75 system_package_task::SystemPackageTask,
76 version::{check_version_middleware, set_version_middleware},
77 watermark_task::{Watermark, WatermarkLock, WatermarkTask},
78 },
79 types::{
80 chain_identifier::ChainIdentifierCache,
81 datatype::IMoveDatatype,
82 move_object::IMoveObject,
83 object::IObject,
84 owner::IOwner,
85 query::{IotaGraphQLSchema, Query},
86 subscription::{GraphQLStream, Subscription},
87 },
88};
89
90const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);
93const MAX_COMPLEXITY: usize = 6000;
95
96pub(crate) struct Server {
97 router: Router,
98 address: SocketAddr,
99 watermark_task: WatermarkTask,
100 system_package_task: SystemPackageTask,
101 trigger_exchange_rates_task: TriggerExchangeRatesTask,
102 state: AppState,
103}
104
105impl Server {
106 pub async fn run(mut self) -> Result<(), Error> {
110 get_or_init_server_start_time().await;
111
112 let watermark_task = {
116 info!("Starting watermark update task");
117 spawn_monitored_task!(async move {
118 self.watermark_task.run().await;
119 })
120 };
121
122 let system_package_task = {
125 info!("Starting system package task");
126 spawn_monitored_task!(async move {
127 self.system_package_task.run().await;
128 })
129 };
130
131 let trigger_exchange_rates_task = {
132 info!("Starting trigger exchange rates task");
133 spawn_monitored_task!(async move {
134 self.trigger_exchange_rates_task.run().await;
135 })
136 };
137
138 let server_task = {
139 info!("Starting graphql service");
140 let cancellation_token = self.state.cancellation_token.clone();
141 let address = self.address;
142 let router = self.router;
143 spawn_monitored_task!(async move {
144 axum::serve(
145 TcpListener::bind(address)
146 .await
147 .map_err(|e| Error::Internal(format!("listener bind failed: {e}")))?,
148 router.into_make_service_with_connect_info::<SocketAddr>(),
149 )
150 .with_graceful_shutdown(async move {
151 cancellation_token.cancelled().await;
152 info!("Shutdown signal received, terminating graphql service");
153 })
154 .await
155 .map_err(|e| Error::Internal(format!("Server run failed: {e}")))
156 })
157 };
158
159 let _ = join!(
163 watermark_task,
164 system_package_task,
165 trigger_exchange_rates_task,
166 server_task
167 );
168
169 Ok(())
170 }
171}
172
173pub(crate) struct ServerBuilder {
174 state: AppState,
175 schema: SchemaBuilder<Query, Mutation, Subscription>,
176 router: Option<Router>,
177 db_reader: Option<Db>,
178 resolver: Option<PackageResolver>,
179}
180
181#[derive(Clone)]
182pub(crate) struct AppState {
183 connection: ConnectionConfig,
184 service: ServiceConfig,
185 metrics: Metrics,
186 cancellation_token: CancellationToken,
187 pub version: Version,
188}
189
190impl AppState {
191 pub(crate) fn new(
192 connection: ConnectionConfig,
193 service: ServiceConfig,
194 metrics: Metrics,
195 cancellation_token: CancellationToken,
196 version: Version,
197 ) -> Self {
198 Self {
199 connection,
200 service,
201 metrics,
202 cancellation_token,
203 version,
204 }
205 }
206}
207
208impl FromRef<AppState> for ConnectionConfig {
209 fn from_ref(app_state: &AppState) -> ConnectionConfig {
210 app_state.connection.clone()
211 }
212}
213
214impl FromRef<AppState> for Metrics {
215 fn from_ref(app_state: &AppState) -> Metrics {
216 app_state.metrics.clone()
217 }
218}
219
220impl ServerBuilder {
221 pub fn new(state: AppState) -> Self {
222 Self {
223 state,
224 schema: schema_builder(),
225 router: None,
226 db_reader: None,
227 resolver: None,
228 }
229 }
230
231 pub fn address(&self) -> String {
232 format!(
233 "{}:{}",
234 self.state.connection.host, self.state.connection.port
235 )
236 }
237
238 pub fn context_data(mut self, context_data: impl Any + Send + Sync) -> Self {
239 self.schema = self.schema.data(context_data);
240 self
241 }
242
243 pub fn extension(mut self, extension: impl ExtensionFactory) -> Self {
244 self.schema = self.schema.extension(extension);
245 self
246 }
247
248 fn build_schema(self) -> Schema<Query, Mutation, Subscription> {
249 self.schema.finish()
250 }
251
252 fn build_components(
255 self,
256 ) -> (
257 String,
258 Schema<Query, Mutation, Subscription>,
259 Db,
260 PackageResolver,
261 Router,
262 ) {
263 let address = self.address();
264 let ServerBuilder {
265 state: _,
266 schema,
267 db_reader,
268 resolver,
269 router,
270 } = self;
271 (
272 address,
273 schema.finish(),
274 db_reader.expect("db reader not initialized"),
275 resolver.expect("package resolver not initialized"),
276 router.expect("router not initialized"),
277 )
278 }
279
280 fn init_router(&mut self) {
281 if self.router.is_none() {
282 let router: Router = Router::new()
283 .route("/", post(graphql_handler))
284 .route("/subscriptions", get(subscription_handler))
285 .route("/{version}", post(graphql_handler))
286 .route("/{version}/subscriptions", get(subscription_handler))
287 .route("/graphql", post(graphql_handler))
288 .route("/graphql/subscriptions", get(subscription_handler))
289 .route("/graphql/{version}", post(graphql_handler))
290 .route(
291 "/graphql/{version}/subscriptions",
292 get(subscription_handler),
293 )
294 .route("/health", get(health_check))
295 .route("/graphql/health", get(health_check))
296 .route("/graphql/{version}/health", get(health_check))
297 .with_state(self.state.clone())
298 .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
299 metrics: self.state.metrics.clone(),
300 }));
301 self.router = Some(router);
302 }
303 }
304
305 pub fn route(mut self, path: &str, method_handler: MethodRouter) -> Self {
306 self.init_router();
307 self.router = self.router.map(|router| router.route(path, method_handler));
308 self
309 }
310
311 pub fn layer<L>(mut self, layer: L) -> Self
312 where
313 L: Layer<Route> + Clone + Send + Sync + 'static,
314 L::Service: Service<Request<Body>> + Clone + Send + Sync + 'static,
315 <L::Service as Service<Request<Body>>>::Response: IntoResponse + 'static,
316 <L::Service as Service<Request<Body>>>::Error: Into<Infallible> + 'static,
317 <L::Service as Service<Request<Body>>>::Future: Send + 'static,
318 {
319 self.init_router();
320 self.router = self.router.map(|router| router.layer(layer));
321 self
322 }
323
324 fn cors() -> Result<CorsLayer, Error> {
325 let acl = match std::env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
326 Ok(value) => {
327 let allow_hosts = value
328 .split(',')
329 .map(HeaderValue::from_str)
330 .collect::<Result<Vec<_>, _>>()
331 .map_err(|_| {
332 Error::Internal(
333 "Cannot resolve access control origin env variable".to_string(),
334 )
335 })?;
336 AllowOrigin::list(allow_hosts)
337 }
338 _ => AllowOrigin::any(),
339 };
340 info!("Access control allow origin set to: {acl:?}");
341
342 let cors = CorsLayer::new()
343 .allow_methods([Method::POST, Method::GET])
345 .allow_origin(acl)
347 .allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
348 Ok(cors)
349 }
350
351 pub fn build(self) -> Result<Server, Error> {
353 let state = self.state.clone();
354 let (address, schema, db_reader, resolver, router) = self.build_components();
355
356 let watermark_task = WatermarkTask::new(
358 db_reader.clone(),
359 state.metrics.clone(),
360 std::time::Duration::from_millis(state.service.background_tasks.watermark_update_ms),
361 state.cancellation_token.clone(),
362 );
363
364 let system_package_task = SystemPackageTask::new(
365 resolver,
366 watermark_task.epoch_receiver(),
367 state.cancellation_token.clone(),
368 );
369
370 let trigger_exchange_rates_task = TriggerExchangeRatesTask::new(
371 db_reader,
372 watermark_task.epoch_receiver(),
373 state.cancellation_token.clone(),
374 );
375
376 let router = router
377 .route_layer(middleware::from_fn_with_state(
378 state.version,
379 set_version_middleware,
380 ))
381 .route_layer(middleware::from_fn_with_state(
382 state.version,
383 check_version_middleware,
384 ))
385 .layer(axum::extract::Extension(schema))
386 .layer(axum::extract::Extension(watermark_task.lock()))
387 .layer(Self::cors()?);
388
389 Ok(Server {
390 router,
391 address: address
392 .parse()
393 .map_err(|_| Error::Internal(format!("Failed to parse address {address}")))?,
394 watermark_task,
395 system_package_task,
396 trigger_exchange_rates_task,
397 state,
398 })
399 }
400
401 pub async fn from_config(
404 config: &ServerConfig,
405 version: &Version,
406 cancellation_token: CancellationToken,
407 ) -> Result<Self, Error> {
408 let prom_addr: SocketAddr = format!(
410 "{}:{}",
411 config.connection.prom_host, config.connection.prom_port
412 )
413 .parse()
414 .map_err(|_| {
415 Error::Internal(format!(
416 "Failed to parse url {}, port {} into socket address",
417 config.connection.prom_host, config.connection.prom_port
418 ))
419 })?;
420
421 let registry_service = iota_metrics::start_prometheus_server(prom_addr);
422 info!("Starting Prometheus HTTP endpoint at {}", prom_addr);
423 let registry = registry_service.default_registry();
424 registry
425 .register(iota_metrics::uptime_metric(
426 "graphql",
427 version.full,
428 "unknown",
429 ))
430 .unwrap();
431
432 let metrics = Metrics::new(®istry);
434 let indexer_metrics = IndexerMetrics::new(®istry);
435 let state = AppState::new(
436 config.connection.clone(),
437 config.service.clone(),
438 metrics.clone(),
439 cancellation_token,
440 *version,
441 );
442 let mut builder = ServerBuilder::new(state);
443
444 let iota_names_config = config.service.iota_names.clone();
445 let zklogin_config = config.service.zklogin.clone();
446 let reader = PgManager::reader_with_config(
447 config.connection.db_url.clone(),
448 config.connection.db_pool_size,
449 config.service.limits.request_timeout_ms.into(),
453 )
454 .map_err(|e| Error::ServerInit(format!("Failed to create pg connection pool: {e}")))?;
455
456 if !config.connection.skip_migration_consistency_check {
457 info!("Starting compatibility check");
459 let mut connection = get_pool_connection(&reader.get_pool())?;
460 check_db_migration_consistency(&mut connection)?;
461 info!("Compatibility check passed");
462 }
463
464 let db = Db::new(
466 reader.clone(),
467 config.service.limits.clone(),
468 metrics.clone(),
469 );
470 let loader = DataLoader::new(db.clone());
471 let pg_conn_pool = PgManager::new(reader.clone());
472 let package_store = DbPackageStore::new(loader.clone());
473 let resolver = Arc::new(Resolver::new_with_limits(
474 PackageStoreWithLruCache::new(package_store),
475 config.service.limits.package_resolver_limits(),
476 ));
477
478 builder.db_reader = Some(db.clone());
479 builder.resolver = Some(resolver.clone());
480
481 let Some(fullnode_url) = config.tx_exec_full_node.node_rpc_url.as_ref() else {
482 return Err(Error::ServerInit(
483 "No fullnode url found in config".to_string(),
484 ));
485 };
486
487 let graphql_streams =
488 GraphQLStream::new(&config.connection.db_url, reader.clone(), ®istry).await?;
489 let write_api = build_write_api(fullnode_url, reader, indexer_metrics)?;
490
491 builder = builder
492 .context_data(config.service.clone())
493 .context_data(loader)
494 .context_data(db)
495 .context_data(pg_conn_pool)
496 .context_data(resolver)
497 .context_data(write_api)
498 .context_data(iota_names_config)
499 .context_data(zklogin_config)
500 .context_data(metrics.clone())
501 .context_data(config.clone())
502 .context_data(graphql_streams)
503 .context_data(ChainIdentifierCache::default());
504
505 if config.internal_features.feature_gate {
506 builder = builder.extension(FeatureGate);
507 }
508
509 if config.internal_features.logger {
510 builder = builder.extension(Logger::default());
511 }
512
513 if config.internal_features.query_limits_checker {
514 builder = builder.extension(QueryLimitsChecker);
515 }
516
517 if config.internal_features.directive_checker {
518 builder = builder.extension(DirectiveChecker);
519 }
520
521 if config.internal_features.query_timeout {
522 builder = builder.extension(Timeout);
523 }
524
525 if config.internal_features.tracing {
526 builder = builder.extension(Tracing);
527 }
528
529 if config.internal_features.apollo_tracing {
530 builder = builder.extension(ApolloTracing);
531 }
532
533 Ok(builder)
537 }
538}
539
540fn schema_builder() -> SchemaBuilder<Query, Mutation, Subscription> {
541 async_graphql::Schema::build(Query, Mutation, Subscription)
542 .limit_complexity(MAX_COMPLEXITY)
543 .register_output_type::<IMoveObject>()
544 .register_output_type::<IObject>()
545 .register_output_type::<IOwner>()
546 .register_output_type::<IMoveDatatype>()
547}
548
549pub fn export_schema() -> String {
551 schema_builder().finish().sdl()
552}
553
554async fn graphql_handler(
559 ConnectInfo(addr): ConnectInfo<SocketAddr>,
560 TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
561 schema: Extension<IotaGraphQLSchema>,
562 Extension(watermark_lock): Extension<WatermarkLock>,
563 headers: HeaderMap,
564 req: GraphQLRequest,
565) -> (axum::http::Extensions, GraphQLResponse) {
566 let mut req = req.into_inner();
567 req.data.insert(PayloadSize(content_length));
568 req.data.insert(Uuid::new_v4());
569 if headers.contains_key(ShowUsage::name()) {
570 req.data.insert(ShowUsage)
571 }
572
573 req.data.insert(addr);
577 req.data.insert(Watermark::new(watermark_lock).await);
578
579 let result = schema.execute(req).await;
580
581 let mut extensions = axum::http::Extensions::new();
584 if result.is_err() {
585 extensions.insert(GraphqlErrors(std::sync::Arc::new(result.errors.clone())));
586 };
587 (extensions, result.into())
588}
589
590pub(crate) fn get_write_api<'ctx>(
591 ctx: &'ctx Context<'_>,
592) -> Result<&'ctx OptimisticWriteApi, Error> {
593 ctx.data_opt()
594 .ok_or_else(|| Error::Internal("Unable to get node execution interface".to_string()))
595}
596
597fn build_write_api(
598 fullnode_url: &str,
599 reader: IndexerReader,
600 metrics: IndexerMetrics,
601) -> Result<OptimisticWriteApi, Error> {
602 let json_rpc_client = build_json_rpc_client(fullnode_url)?;
603 let indexer_store = PgIndexerStore::new(reader.get_pool(), metrics.clone());
604 let optimistic_tx_executor =
605 OptimisticTransactionExecutor::new(fullnode_url, reader.clone(), indexer_store, metrics);
606 Ok(OptimisticWriteApi::new(
607 WriteApi::new(json_rpc_client, reader),
608 optimistic_tx_executor,
609 ))
610}
611
612fn build_json_rpc_client(rpc_client_url: &str) -> Result<HttpClient, Error> {
613 let mut headers = HeaderMap::new();
614 headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("graphql"));
615
616 let builder = HttpClientBuilder::default()
617 .max_request_size(2 << 30)
618 .set_headers(headers.clone())
619 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
620 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS);
621
622 builder.build(rpc_client_url).map_err(|e| {
623 warn!("Failed to get new Http client with error: {e:?}");
624 Error::Internal(format!(
625 "Failed to initialize fullnode RPC client with error: {e:?}"
626 ))
627 })
628}
629async fn subscription_handler(
635 ConnectInfo(addr): ConnectInfo<SocketAddr>,
636 Extension(schema): Extension<IotaGraphQLSchema>,
637 req: http::Request<Body>,
638) -> AxumResponse {
639 let headers_contains_show_usage = req.headers().contains_key(ShowUsage::name());
640 let (mut parts, _body) = req.into_parts();
641
642 let protocol = match GraphQLProtocol::from_request_parts(&mut parts, &()).await {
644 Ok(protocol) => protocol,
645 Err(err) => return err.into_response(),
646 };
647
648 let upgrade = match WebSocketUpgrade::from_request_parts(&mut parts, &()).await {
650 Ok(upgrade) => upgrade,
651 Err(err) => return err.into_response(),
652 };
653
654 let resp = upgrade
655 .protocols(ALL_WEBSOCKET_PROTOCOLS)
656 .on_upgrade(move |stream| async move {
657 let mut connection_data = Data::default();
659 connection_data.insert(PayloadSize(0));
661 connection_data.insert(Uuid::new_v4());
662 if headers_contains_show_usage {
663 connection_data.insert(ShowUsage)
664 }
665 connection_data.insert(addr);
666
667 let connection =
668 GraphQLWebSocket::new(stream, schema, protocol).with_data(connection_data);
669 connection.serve().await;
670 });
671 resp
672}
673
674#[derive(Clone)]
675struct MetricsMakeCallbackHandler {
676 metrics: Metrics,
677}
678
679impl MakeCallbackHandler for MetricsMakeCallbackHandler {
680 type Handler = MetricsCallbackHandler;
681
682 fn make_handler(&self, _request: &http::request::Parts) -> Self::Handler {
683 let start = Instant::now();
684 let metrics = self.metrics.clone();
685
686 metrics.request_metrics.inflight_requests.inc();
687 metrics.inc_num_queries();
688
689 MetricsCallbackHandler { metrics, start }
690 }
691}
692
693struct MetricsCallbackHandler {
694 metrics: Metrics,
695 start: Instant,
696}
697
698impl ResponseHandler for MetricsCallbackHandler {
699 fn on_response(&mut self, response: &http::response::Parts) {
700 if let Some(errors) = response.extensions.get::<GraphqlErrors>() {
701 self.metrics.inc_errors(&errors.0);
702 }
703 }
704
705 fn on_error<E>(&mut self, _error: &E) {
706 }
711}
712
713impl Drop for MetricsCallbackHandler {
714 fn drop(&mut self) {
715 self.metrics.query_latency(self.start.elapsed());
716 self.metrics.request_metrics.inflight_requests.dec();
717 }
718}
719
720#[derive(Debug, Clone)]
721struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);
722
723async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
725 let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
726 return StatusCode::INTERNAL_SERVER_ERROR;
727 };
728
729 let Some(host) = url.host_str() else {
730 return StatusCode::INTERNAL_SERVER_ERROR;
731 };
732
733 let tcp_url = if let Some(port) = url.port() {
734 format!("{host}:{port}")
735 } else {
736 host.to_string()
737 };
738
739 if TcpStream::connect(tcp_url).is_err() {
740 StatusCode::INTERNAL_SERVER_ERROR
741 } else {
742 StatusCode::OK
743 }
744}
745
746#[derive(serde::Deserialize)]
747struct HealthParam {
748 max_checkpoint_lag_ms: Option<u64>,
749}
750
751async fn health_check(
757 State(connection): State<ConnectionConfig>,
758 Extension(watermark_lock): Extension<WatermarkLock>,
759 AxumQuery(query_params): AxumQuery<HealthParam>,
760) -> StatusCode {
761 let db_health_check = db_health_check(axum::extract::State(connection)).await;
762 if db_health_check != StatusCode::OK {
763 return db_health_check;
764 }
765
766 let max_checkpoint_lag_ms = query_params
767 .max_checkpoint_lag_ms
768 .map(Duration::from_millis)
769 .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);
770
771 let checkpoint_timestamp =
772 Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);
773
774 let now_millis = Utc::now().timestamp_millis();
775
776 let now: Duration = match u64::try_from(now_millis) {
778 Ok(val) => Duration::from_millis(val),
779 Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
780 };
781
782 if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
783 return StatusCode::GATEWAY_TIMEOUT;
784 }
785
786 db_health_check
787}
788
789async fn get_or_init_server_start_time() -> &'static Instant {
791 static ONCE: OnceCell<Instant> = OnceCell::const_new();
792 ONCE.get_or_init(|| async move { Instant::now() }).await
793}
794
795pub mod tests {
796 use std::{sync::Arc, time::Duration};
797
798 use async_graphql::{
799 Request, Response, Variables,
800 extensions::{Extension, ExtensionContext, NextExecute},
801 };
802 use iota_types::transaction::TransactionData;
803 use serde_json::json;
804 use uuid::Uuid;
805
806 use super::*;
807 use crate::{
808 config::{ConnectionConfig, Limits, ServiceConfig, Version},
809 context_data::db_data_provider::PgManager,
810 extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
811 test_infra::cluster::Cluster,
812 };
813
814 fn prep_schema(
818 connection_config: Option<ConnectionConfig>,
819 service_config: Option<ServiceConfig>,
820 ) -> ServerBuilder {
821 let connection_config = connection_config.unwrap_or_default();
822 let service_config = service_config.unwrap_or_default();
823
824 let reader = PgManager::reader_with_config(
825 connection_config.db_url.clone(),
826 connection_config.db_pool_size,
827 service_config.limits.request_timeout_ms.into(),
828 )
829 .expect("failed to create pg connection pool");
830
831 let version = Version::for_testing();
832 let metrics = metrics();
833 let db = Db::new(
834 reader.clone(),
835 service_config.limits.clone(),
836 metrics.clone(),
837 );
838 let loader = DataLoader::new(db.clone());
839 let pg_conn_pool = PgManager::new(reader);
840 let cancellation_token = CancellationToken::new();
841 let watermark = Watermark {
842 checkpoint: 1,
843 checkpoint_timestamp_ms: 1,
844 epoch: 0,
845 };
846 let state = AppState::new(
847 connection_config.clone(),
848 service_config.clone(),
849 metrics.clone(),
850 cancellation_token.clone(),
851 version,
852 );
853 ServerBuilder::new(state)
854 .context_data(db)
855 .context_data(loader)
856 .context_data(pg_conn_pool)
857 .context_data(service_config)
858 .context_data(query_id())
859 .context_data(ip_address())
860 .context_data(watermark)
861 .context_data(ChainIdentifierCache::default())
862 .context_data(metrics)
863 }
864
865 fn metrics() -> Metrics {
866 let binding_address: SocketAddr = "0.0.0.0:9185".parse().unwrap();
867 let registry = iota_metrics::start_prometheus_server(binding_address).default_registry();
868 Metrics::new(®istry)
869 }
870
871 fn ip_address() -> SocketAddr {
872 let binding_address: SocketAddr = "0.0.0.0:51515".parse().unwrap();
873 binding_address
874 }
875
876 fn query_id() -> Uuid {
877 Uuid::new_v4()
878 }
879
880 pub async fn test_timeout_impl(cluster: &Cluster) {
881 struct TimedExecuteExt {
882 pub min_req_delay: Duration,
883 }
884
885 impl ExtensionFactory for TimedExecuteExt {
886 fn create(&self) -> Arc<dyn Extension> {
887 Arc::new(TimedExecuteExt {
888 min_req_delay: self.min_req_delay,
889 })
890 }
891 }
892
893 #[async_trait::async_trait]
894 impl Extension for TimedExecuteExt {
895 async fn execute(
896 &self,
897 ctx: &ExtensionContext<'_>,
898 operation_name: Option<&str>,
899 next: NextExecute<'_>,
900 ) -> Response {
901 tokio::time::sleep(self.min_req_delay).await;
902 next.run(ctx, operation_name).await
903 }
904 }
905
906 async fn test_timeout(
907 delay: Duration,
908 timeout: Duration,
909 query: &str,
910 write_api: OptimisticWriteApi,
911 ) -> Response {
912 let mut cfg = ServiceConfig::default();
913 cfg.limits.request_timeout_ms = timeout.as_millis() as u32;
914 cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32;
915
916 let schema = prep_schema(None, Some(cfg))
917 .context_data(write_api)
918 .extension(Timeout)
919 .extension(TimedExecuteExt {
920 min_req_delay: delay,
921 })
922 .build_schema();
923
924 schema.execute(query).await
925 }
926
927 let wallet = &cluster.validator_fullnode_handle.wallet;
928 let store = &cluster.indexer_store;
929 let fn_rpc_url = &cluster.validator_fullnode_handle.fullnode_handle.rpc_url;
930 let indexer_metrics = store.get_metrics();
931 let indexer_reader = iota_indexer::read::IndexerReader::new(store.blocking_cp());
932 let json_rpc_client = build_json_rpc_client(fn_rpc_url).unwrap();
933
934 let optimistic_tx_executor =
935 iota_indexer::optimistic_indexing::OptimisticTransactionExecutor::new(
936 fn_rpc_url,
937 indexer_reader.clone(),
938 store.clone(),
939 indexer_metrics,
940 );
941 let write_api = OptimisticWriteApi::new(
942 WriteApi::new(json_rpc_client, indexer_reader),
943 optimistic_tx_executor,
944 );
945
946 let query = "{ chainIdentifier }";
947 let timeout = Duration::from_millis(1000);
948 let delay = Duration::from_millis(100);
949
950 test_timeout(delay, timeout, query, write_api.clone())
951 .await
952 .into_result()
953 .expect("should complete successfully");
954
955 let errs: Vec<_> = test_timeout(delay, delay, query, write_api.clone())
957 .await
958 .into_result()
959 .unwrap_err()
960 .into_iter()
961 .map(|e| e.message)
962 .collect();
963 let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
964 assert_eq!(errs, vec![exp]);
965
966 let addresses = wallet.get_addresses();
970 let gas = wallet
971 .get_one_gas_object_owned_by_address(addresses[0])
972 .await
973 .unwrap();
974 let tx_data = TransactionData::new_transfer_iota(
975 addresses[1],
976 addresses[0],
977 Some(1000),
978 gas.unwrap(),
979 1_000_000,
980 wallet.get_reference_gas_price().await.unwrap(),
981 );
982
983 let tx = wallet.sign_transaction(&tx_data);
984 let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
985
986 let signature_base64 = &signatures[0];
987 let query = format!(
988 r#"
989 mutation {{
990 executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
991 effects {{
992 status
993 }}
994 }}
995 }}"#,
996 tx_bytes.encoded(),
997 signature_base64.encoded()
998 );
999 let errs: Vec<_> = test_timeout(delay, delay, &query, write_api.clone())
1000 .await
1001 .into_result()
1002 .unwrap_err()
1003 .into_iter()
1004 .map(|e| e.message)
1005 .collect();
1006 let exp = format!(
1007 "Mutation request timed out. Limit: {}s",
1008 delay.as_secs_f32()
1009 );
1010 assert_eq!(errs, vec![exp]);
1011 }
1012
1013 pub async fn test_query_depth_limit_impl() {
1014 async fn exec_query_depth_limit(depth: u32, query: &str) -> Response {
1015 let service_config = ServiceConfig {
1016 limits: Limits {
1017 max_query_depth: depth,
1018 ..Default::default()
1019 },
1020 ..Default::default()
1021 };
1022
1023 let schema = prep_schema(None, Some(service_config))
1024 .context_data(PayloadSize(100))
1025 .extension(QueryLimitsChecker)
1026 .build_schema();
1027 schema.execute(query).await
1028 }
1029
1030 exec_query_depth_limit(1, "{ chainIdentifier }")
1031 .await
1032 .into_result()
1033 .expect("should complete successfully");
1034
1035 exec_query_depth_limit(
1036 5,
1037 "{ chainIdentifier protocolConfig { configs { value key }} }",
1038 )
1039 .await
1040 .into_result()
1041 .expect("should complete successfully");
1042
1043 let errs: Vec<_> = exec_query_depth_limit(0, "{ chainIdentifier }")
1045 .await
1046 .into_result()
1047 .unwrap_err()
1048 .into_iter()
1049 .map(|e| e.message)
1050 .collect();
1051
1052 assert_eq!(errs, vec!["Query nesting is over 0".to_string()]);
1053 let errs: Vec<_> = exec_query_depth_limit(
1054 2,
1055 "{ chainIdentifier protocolConfig { configs { value key }} }",
1056 )
1057 .await
1058 .into_result()
1059 .unwrap_err()
1060 .into_iter()
1061 .map(|e| e.message)
1062 .collect();
1063 assert_eq!(errs, vec!["Query nesting is over 2".to_string()]);
1064 }
1065
1066 pub async fn test_query_node_limit_impl() {
1067 async fn exec_query_node_limit(nodes: u32, query: &str) -> Response {
1068 let service_config = ServiceConfig {
1069 limits: Limits {
1070 max_query_nodes: nodes,
1071 ..Default::default()
1072 },
1073 ..Default::default()
1074 };
1075
1076 let schema = prep_schema(None, Some(service_config))
1077 .context_data(PayloadSize(100))
1078 .extension(QueryLimitsChecker)
1079 .build_schema();
1080 schema.execute(query).await
1081 }
1082
1083 exec_query_node_limit(1, "{ chainIdentifier }")
1084 .await
1085 .into_result()
1086 .expect("should complete successfully");
1087
1088 exec_query_node_limit(
1089 5,
1090 "{ chainIdentifier protocolConfig { configs { value key }} }",
1091 )
1092 .await
1093 .into_result()
1094 .expect("should complete successfully");
1095
1096 let err: Vec<_> = exec_query_node_limit(0, "{ chainIdentifier }")
1098 .await
1099 .into_result()
1100 .unwrap_err()
1101 .into_iter()
1102 .map(|e| e.message)
1103 .collect();
1104 assert_eq!(err, vec!["Query has over 0 nodes".to_string()]);
1105
1106 let err: Vec<_> = exec_query_node_limit(
1107 4,
1108 "{ chainIdentifier protocolConfig { configs { value key }} }",
1109 )
1110 .await
1111 .into_result()
1112 .unwrap_err()
1113 .into_iter()
1114 .map(|e| e.message)
1115 .collect();
1116 assert_eq!(err, vec!["Query has over 4 nodes".to_string()]);
1117 }
1118
1119 pub async fn test_query_default_page_limit_impl(connection_config: ConnectionConfig) {
1120 let service_config = ServiceConfig {
1121 limits: Limits {
1122 default_page_size: 1,
1123 ..Default::default()
1124 },
1125 ..Default::default()
1126 };
1127 let schema = prep_schema(Some(connection_config), Some(service_config)).build_schema();
1128
1129 let resp = schema
1130 .execute("{ checkpoints { nodes { sequenceNumber } } }")
1131 .await;
1132 let data = resp.data.clone().into_json().unwrap();
1133 let checkpoints = data
1134 .get("checkpoints")
1135 .unwrap()
1136 .get("nodes")
1137 .unwrap()
1138 .as_array()
1139 .unwrap();
1140 assert_eq!(
1141 checkpoints.len(),
1142 1,
1143 "Checkpoints should have exactly one element"
1144 );
1145
1146 let resp = schema
1147 .execute("{ checkpoints(first: 2) { nodes { sequenceNumber } } }")
1148 .await;
1149 let data = resp.data.clone().into_json().unwrap();
1150 let checkpoints = data
1151 .get("checkpoints")
1152 .unwrap()
1153 .get("nodes")
1154 .unwrap()
1155 .as_array()
1156 .unwrap();
1157 assert_eq!(
1158 checkpoints.len(),
1159 2,
1160 "Checkpoints should return two elements"
1161 );
1162 }
1163
1164 pub async fn test_query_max_page_limit_impl() {
1165 let schema = prep_schema(None, None).build_schema();
1166
1167 schema
1168 .execute("{ objects(first: 1) { nodes { version } } }")
1169 .await
1170 .into_result()
1171 .expect("should complete successfully");
1172
1173 let err: Vec<_> = schema
1175 .execute("{ objects(first: 51) { nodes { version } } }")
1176 .await
1177 .into_result()
1178 .unwrap_err()
1179 .into_iter()
1180 .map(|e| e.message)
1181 .collect();
1182 assert_eq!(
1183 err,
1184 vec!["Connection's page size of 51 exceeds max of 50".to_string()]
1185 );
1186 }
1187
1188 pub async fn test_query_complexity_metrics_impl() {
1189 let server_builder = prep_schema(None, None).context_data(PayloadSize(100));
1190 let metrics = server_builder.state.metrics.clone();
1191 let schema = server_builder
1192 .extension(QueryLimitsChecker) .build_schema();
1194
1195 schema
1196 .execute("{ chainIdentifier }")
1197 .await
1198 .into_result()
1199 .expect("should complete successfully");
1200
1201 let req_metrics = metrics.request_metrics;
1202 assert_eq!(req_metrics.input_nodes.get_sample_count(), 1);
1203 assert_eq!(req_metrics.output_nodes.get_sample_count(), 1);
1204 assert_eq!(req_metrics.query_depth.get_sample_count(), 1);
1205 assert_eq!(req_metrics.input_nodes.get_sample_sum(), 1.);
1206 assert_eq!(req_metrics.output_nodes.get_sample_sum(), 1.);
1207 assert_eq!(req_metrics.query_depth.get_sample_sum(), 1.);
1208
1209 schema
1210 .execute("{ chainIdentifier protocolConfig { configs { value key }} }")
1211 .await
1212 .into_result()
1213 .expect("should complete successfully");
1214
1215 assert_eq!(req_metrics.input_nodes.get_sample_count(), 2);
1216 assert_eq!(req_metrics.output_nodes.get_sample_count(), 2);
1217 assert_eq!(req_metrics.query_depth.get_sample_count(), 2);
1218 assert_eq!(req_metrics.input_nodes.get_sample_sum(), 2. + 4.);
1219 assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
1220 assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
1221 }
1222
1223 pub async fn test_health_check_impl() {
1224 let server_builder = prep_schema(None, None);
1225 let url = format!(
1226 "http://{}:{}/health",
1227 server_builder.state.connection.host, server_builder.state.connection.port
1228 );
1229 server_builder.build_schema();
1230
1231 let resp = reqwest::get(&url).await.unwrap();
1232 assert_eq!(resp.status(), reqwest::StatusCode::OK);
1233
1234 let url_with_param = format!("{url}?max_checkpoint_lag_ms=1");
1235 let resp = reqwest::get(&url_with_param).await.unwrap();
1236 assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT);
1237 }
1238
1239 async fn execute_for_error(limits: Limits, request: Request) -> String {
1242 let service_config = ServiceConfig {
1243 limits,
1244 ..Default::default()
1245 };
1246
1247 let schema = prep_schema(None, Some(service_config))
1248 .context_data(PayloadSize(
1249 serde_json::to_string(&request).unwrap().len() as u64,
1254 ))
1255 .extension(QueryLimitsChecker)
1256 .build_schema();
1257
1258 let errs: Vec<_> = schema
1259 .execute(request)
1260 .await
1261 .into_result()
1262 .unwrap_err()
1263 .into_iter()
1264 .map(|e| e.message)
1265 .collect();
1266
1267 errs.join("\n")
1268 }
1269
1270 pub async fn test_payload_read_exceeded_impl() {
1271 assert_eq!(
1272 execute_for_error(
1273 Limits {
1274 max_tx_payload_size: 400,
1275 max_query_payload_size: 10,
1276 ..Default::default()
1277 },
1278 r#"
1279 mutation {
1280 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1281 effects {
1282 status
1283 }
1284 }
1285 }
1286 "#
1287 .into()
1288 )
1289 .await,
1290 "Query part too large: 354 bytes. Requests are limited to 400 bytes or fewer on \
1291 transaction payloads (all inputs to executeTransactionBlock or \
1292 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 \
1293 bytes or fewer."
1294 );
1295 }
1296
1297 pub async fn test_payload_mutation_exceeded_impl() {
1298 assert_eq!(
1299 execute_for_error(
1300 Limits {
1301 max_tx_payload_size: 10,
1302 max_query_payload_size: 400,
1303 ..Default::default()
1304 },
1305 r#"
1306 mutation {
1307 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1308 effects {
1309 status
1310 }
1311 }
1312 }
1313 "#
1314 .into()
1315 )
1316 .await,
1317 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1318 transaction payloads (all inputs to executeTransactionBlock or \
1319 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1320 bytes or fewer."
1321 );
1322 }
1323
1324 pub async fn test_payload_dry_run_exceeded_impl() {
1325 assert_eq!(
1326 execute_for_error(
1327 Limits {
1328 max_tx_payload_size: 10,
1329 max_query_payload_size: 400,
1330 ..Default::default()
1331 },
1332 r#"
1333 query {
1334 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1335 error
1336 transaction {
1337 digest
1338 }
1339 }
1340 }
1341 "#
1342 .into(),
1343 )
1344 .await,
1345 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1346 transaction payloads (all inputs to executeTransactionBlock or \
1347 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 bytes \
1348 or fewer."
1349 );
1350 }
1351
1352 pub async fn test_payload_total_exceeded_impl() {
1353 assert_eq!(
1354 execute_for_error(
1355 Limits {
1356 max_tx_payload_size: 10,
1357 max_query_payload_size: 10,
1358 ..Default::default()
1359 },
1360 r#"
1361 query {
1362 dryRunTransactionBlock(txByte: "AAABBB") {
1363 error
1364 transaction {
1365 digest
1366 }
1367 }
1368 }
1369 "#
1370 .into(),
1371 )
1372 .await,
1373 "Overall request too large: 380 bytes. Requests are limited to 10 bytes or fewer on \
1374 transaction payloads (all inputs to executeTransactionBlock or dryRunTransactionBlock) \
1375 and the rest of the request (the query part) must be 10 bytes or fewer."
1376 );
1377 }
1378
1379 pub async fn test_payload_using_vars_mutation_exceeded_impl() {
1380 assert_eq!(
1381 execute_for_error(
1382 Limits {
1383 max_tx_payload_size: 10,
1384 max_query_payload_size: 500,
1385 ..Default::default()
1386 },
1387 Request::new(
1388 r#"
1389 mutation ($tx: String!, $sigs: [String!]!) {
1390 executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1391 effects {
1392 status
1393 }
1394 }
1395 }
1396 "#
1397 )
1398 .variables(Variables::from_json(json!({
1399 "tx": "AAABBBCCC",
1400 "sigs": ["BBB"]
1401 })))
1402 )
1403 .await,
1404 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1405 transaction payloads (all inputs to executeTransactionBlock or \
1406 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1407 bytes or fewer."
1408 );
1409 }
1410
1411 pub async fn test_payload_using_vars_read_exceeded_impl() {
1412 assert_eq!(
1413 execute_for_error(
1414 Limits {
1415 max_tx_payload_size: 500,
1416 max_query_payload_size: 10,
1417 ..Default::default()
1418 },
1419 Request::new(
1420 r#"
1421 mutation ($tx: String!, $sigs: [String!]!) {
1422 executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1423 effects {
1424 status
1425 }
1426 }
1427 }
1428 "#
1429 )
1430 .variables(Variables::from_json(json!({
1431 "tx": "AAA",
1432 "sigs": ["BBB"]
1433 })))
1434 )
1435 .await,
1436 "Query part too large: 409 bytes. Requests are limited to 500 bytes or fewer on \
1437 transaction payloads (all inputs to executeTransactionBlock or \
1438 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1439 or fewer."
1440 );
1441 }
1442
1443 pub async fn test_payload_using_vars_dry_run_exceeded_impl() {
1444 assert_eq!(
1445 execute_for_error(
1446 Limits {
1447 max_tx_payload_size: 10,
1448 max_query_payload_size: 400,
1449 ..Default::default()
1450 },
1451 Request::new(
1452 r#"
1453 query ($tx: String!) {
1454 dryRunTransactionBlock(txBytes: $tx) {
1455 error
1456 transaction {
1457 digest
1458 }
1459 }
1460 }
1461 "#
1462 )
1463 .variables(Variables::from_json(json!({
1464 "tx": "AAABBBCCC"
1465 }))),
1466 )
1467 .await,
1468 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1469 transaction payloads (all inputs to executeTransactionBlock or \
1470 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1471 bytes or fewer."
1472 );
1473 }
1474
1475 pub async fn test_payload_using_vars_dry_run_read_exceeded_impl() {
1476 assert_eq!(
1477 execute_for_error(
1478 Limits {
1479 max_tx_payload_size: 400,
1480 max_query_payload_size: 10,
1481 ..Default::default()
1482 },
1483 Request::new(
1484 r#"
1485 query ($tx: String!) {
1486 dryRunTransactionBlock(txBytes: $tx) {
1487 error
1488 transaction {
1489 digest
1490 }
1491 }
1492 }
1493 "#
1494 )
1495 .variables(Variables::from_json(json!({
1496 "tx": "AAABBBCCC"
1497 }))),
1498 )
1499 .await,
1500 "Query part too large: 398 bytes. Requests are limited to 400 bytes or fewer on \
1501 transaction payloads (all inputs to executeTransactionBlock or \
1502 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1503 or fewer."
1504 );
1505 }
1506
1507 pub async fn test_payload_multiple_execution_exceeded_impl() {
1508 let err = execute_for_error(
1511 Limits {
1512 max_tx_payload_size: 30,
1513 max_query_payload_size: 320,
1514 ..Default::default()
1515 },
1516 r#"
1517 mutation {
1518 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1519 effects {
1520 status
1521 }
1522 }
1523 }
1524 "#
1525 .into(),
1526 )
1527 .await;
1528 assert!(err.starts_with("Query part too large"), "{err}");
1529
1530 assert_eq!(
1531 execute_for_error(
1532 Limits {
1533 max_tx_payload_size: 30,
1534 max_query_payload_size: 800,
1535 ..Default::default()
1536 },
1537 r#"
1538 mutation {
1539 e0: executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1540 effects {
1541 status
1542 }
1543 }
1544 e1: executeTransactionBlock(txBytes: "EEEFFFGGG", signatures: ["HHH"]) {
1545 effects {
1546 status
1547 }
1548 }
1549 }
1550 "#
1551 .into()
1552 )
1553 .await,
1554 "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1555 transaction payloads (all inputs to executeTransactionBlock or \
1556 dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1557 bytes or fewer."
1558 );
1559 }
1560
1561 pub async fn test_payload_multiple_dry_run_exceeded_impl() {
1562 let err = execute_for_error(
1565 Limits {
1566 max_tx_payload_size: 20,
1567 max_query_payload_size: 330,
1568 ..Default::default()
1569 },
1570 r#"
1571 query {
1572 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1573 error
1574 transaction {
1575 digest
1576 }
1577 }
1578 }
1579 "#
1580 .into(),
1581 )
1582 .await;
1583 assert!(err.starts_with("Query part too large"), "{err}");
1584
1585 assert_eq!(
1586 execute_for_error(
1587 Limits {
1588 max_tx_payload_size: 20,
1589 max_query_payload_size: 800,
1590 ..Default::default()
1591 },
1592 r#"
1593 query {
1594 d0: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1595 error
1596 transaction {
1597 digest
1598 }
1599 }
1600 d1: dryRunTransactionBlock(txBytes: "DDDEEEFFF") {
1601 error
1602 transaction {
1603 digest
1604 }
1605 }
1606 }
1607 "#
1608 .into()
1609 )
1610 .await,
1611 "Transaction payload too large. Requests are limited to 20 bytes or fewer on \
1612 transaction payloads (all inputs to executeTransactionBlock or \
1613 dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1614 bytes or fewer."
1615 );
1616 }
1617
1618 pub async fn test_payload_execution_multiple_sigs_exceeded_impl() {
1619 let err = execute_for_error(
1622 Limits {
1623 max_tx_payload_size: 30,
1624 max_query_payload_size: 320,
1625 ..Default::default()
1626 },
1627 r#"
1628 mutation {
1629 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1630 effects {
1631 status
1632 }
1633 }
1634 }
1635 "#
1636 .into(),
1637 )
1638 .await;
1639
1640 assert!(err.starts_with("Query part too large"), "{err}");
1641
1642 assert_eq!(
1643 execute_for_error(
1644 Limits {
1645 max_tx_payload_size: 30,
1646 max_query_payload_size: 500,
1647 ..Default::default()
1648 },
1649 r#"
1650 mutation {
1651 executeTransactionBlock(
1652 txBytes: "AAA",
1653 signatures: ["BBB", "CCC", "DDD", "EEE", "FFF"]
1654 ) {
1655 effects {
1656 status
1657 }
1658 }
1659 }
1660 "#
1661 .into(),
1662 )
1663 .await,
1664 "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1665 transaction payloads (all inputs to executeTransactionBlock or \
1666 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1667 bytes or fewer.",
1668 )
1669 }
1670
1671 pub async fn test_payload_sig_var_execution_exceeded_impl() {
1672 assert_eq!(
1675 execute_for_error(
1676 Limits {
1677 max_tx_payload_size: 10,
1678 max_query_payload_size: 500,
1679 ..Default::default()
1680 },
1681 Request::new(
1682 r#"
1683 mutation ($tx: String!, $sig: String!) {
1684 executeTransactionBlock(txBytes: $tx, signatures: [$sig]) {
1685 effects {
1686 status
1687 }
1688 }
1689 }
1690 "#
1691 )
1692 .variables(Variables::from_json(json!({
1693 "tx": "AAA",
1694 "sig": "BBB"
1695 })))
1696 )
1697 .await,
1698 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1699 transaction payloads (all inputs to executeTransactionBlock or \
1700 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1701 bytes or fewer."
1702 );
1703 }
1704
1705 fn passed_tx_checks(err: &str) -> bool {
1708 !err.starts_with("Overall request too large")
1709 && !err.starts_with("Transaction payload too large")
1710 }
1711
1712 pub async fn test_payload_reusing_vars_execution_impl() {
1713 assert!(!passed_tx_checks(
1719 &execute_for_error(
1720 Limits {
1721 max_tx_payload_size: 1,
1722 max_query_payload_size: 1,
1723 ..Default::default()
1724 },
1725 r#"
1726 mutation {
1727 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1728 effects {
1729 status
1730 }
1731 }
1732 }
1733 "#
1734 .into()
1735 )
1736 .await
1737 ));
1738
1739 let limits = Limits {
1740 max_tx_payload_size: 20,
1741 max_query_payload_size: 1000,
1742 ..Default::default()
1743 };
1744
1745 assert!(passed_tx_checks(
1748 &execute_for_error(
1749 limits.clone(),
1750 Request::new(
1751 r#"
1752 mutation ($sig: String!) {
1753 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig]) {
1754 effects {
1755 status
1756 }
1757 }
1758 }
1759 "#,
1760 )
1761 .variables(Variables::from_json(json!({
1762 "sig": "BBB"
1763 })))
1764 )
1765 .await
1766 ));
1767
1768 let execution_result = execute_for_error(
1771 limits.clone(),
1772 Request::new(
1773 r#"
1774 mutation ($sig: String!) {
1775 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, "BBB"]) {
1776 effects {
1777 status
1778 }
1779 }
1780 }
1781 "#,
1782 )
1783 .variables(Variables::from_json(json!({
1784 "sig": "BBB"
1785 }))),
1786 )
1787 .await;
1788 assert!(!passed_tx_checks(&execution_result), "{execution_result}");
1789
1790 let execution_result = execute_for_error(
1793 limits,
1794 Request::new(
1795 r#"
1796 mutation ($sig: String!) {
1797 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, $sig]) {
1798 effects {
1799 status
1800 }
1801 }
1802 }
1803 "#,
1804 )
1805 .variables(Variables::from_json(json!({
1806 "sig": "BBB"
1807 }))),
1808 )
1809 .await;
1810 assert!(passed_tx_checks(&execution_result), "{execution_result}");
1811 }
1812
1813 pub async fn test_payload_reusing_vars_dry_run_impl() {
1814 let limits = Limits {
1818 max_tx_payload_size: 20,
1819 max_query_payload_size: 1000,
1820 ..Default::default()
1821 };
1822
1823 assert!(passed_tx_checks(
1825 &execute_for_error(
1826 limits.clone(),
1827 Request::new(
1828 r#"
1829 query ($tx: String!) {
1830 dryRunTransactionBlock(txBytes: $tx) {
1831 error
1832 transaction {
1833 digest
1834 }
1835 }
1836 }
1837 "#,
1838 )
1839 .variables(Variables::from_json(json!({
1840 "tx": "AAABBBCCC"
1841 })))
1842 )
1843 .await
1844 ));
1845
1846 assert!(!passed_tx_checks(
1848 &execute_for_error(
1849 limits.clone(),
1850 Request::new(
1851 r#"
1852 query ($tx: String!) {
1853 d0: dryRunTransactionBlock(txBytes: $tx) {
1854 error
1855 transaction {
1856 digest
1857 }
1858 }
1859
1860 d1: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1861 error
1862 transaction {
1863 digest
1864 }
1865 }
1866 }
1867 "#,
1868 )
1869 .variables(Variables::from_json(json!({
1870 "tx": "AAABBBCCC"
1871 })))
1872 )
1873 .await
1874 ));
1875
1876 assert!(passed_tx_checks(
1878 &execute_for_error(
1879 limits,
1880 Request::new(
1881 r#"
1882 query ($tx: String!) {
1883 d0: dryRunTransactionBlock(txBytes: $tx) {
1884 error
1885 transaction {
1886 digest
1887 }
1888 }
1889
1890 d1: dryRunTransactionBlock(txBytes: $tx) {
1891 error
1892 transaction {
1893 digest
1894 }
1895 }
1896 }
1897 "#,
1898 )
1899 .variables(Variables::from_json(json!({
1900 "tx": "AAABBBCCC"
1901 })))
1902 )
1903 .await
1904 ));
1905 }
1906
1907 pub async fn test_payload_named_fragment_execution_exceeded_impl() {
1908 assert_eq!(
1909 execute_for_error(
1910 Limits {
1911 max_tx_payload_size: 10,
1912 max_query_payload_size: 500,
1913 ..Default::default()
1914 },
1915 r#"
1916 mutation {
1917 ...Tx
1918 }
1919
1920 fragment Tx on Mutation {
1921 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1922 effects {
1923 status
1924 }
1925 }
1926 }
1927 "#
1928 .into()
1929 )
1930 .await,
1931 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1932 transaction payloads (all inputs to executeTransactionBlock or \
1933 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1934 bytes or fewer."
1935 );
1936 }
1937
1938 pub async fn test_payload_inline_fragment_execution_exceeded_impl() {
1939 assert_eq!(
1940 execute_for_error(
1941 Limits {
1942 max_tx_payload_size: 10,
1943 max_query_payload_size: 500,
1944 ..Default::default()
1945 },
1946 r#"
1947 mutation {
1948 ... on Mutation {
1949 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1950 effects {
1951 status
1952 }
1953 }
1954 }
1955 }
1956 "#
1957 .into()
1958 )
1959 .await,
1960 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1961 transaction payloads (all inputs to executeTransactionBlock or \
1962 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1963 bytes or fewer."
1964 );
1965 }
1966
1967 pub async fn test_payload_named_fragment_dry_run_exceeded_impl() {
1968 assert_eq!(
1969 execute_for_error(
1970 Limits {
1971 max_tx_payload_size: 10,
1972 max_query_payload_size: 500,
1973 ..Default::default()
1974 },
1975 r#"
1976 query {
1977 ...DryRun
1978 }
1979
1980 fragment DryRun on Query {
1981 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1982 error
1983 transaction {
1984 digest
1985 }
1986 }
1987 }
1988 "#
1989 .into(),
1990 )
1991 .await,
1992 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1993 transaction payloads (all inputs to executeTransactionBlock or \
1994 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1995 bytes or fewer."
1996 );
1997 }
1998
1999 pub async fn test_payload_inline_fragment_dry_run_exceeded_impl() {
2000 assert_eq!(
2001 execute_for_error(
2002 Limits {
2003 max_tx_payload_size: 10,
2004 max_query_payload_size: 500,
2005 ..Default::default()
2006 },
2007 r#"
2008 query {
2009 ... on Query {
2010 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2011 error
2012 transaction {
2013 digest
2014 }
2015 }
2016 }
2017 }
2018 "#
2019 .into(),
2020 )
2021 .await,
2022 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2023 transaction payloads (all inputs to executeTransactionBlock or \
2024 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
2025 bytes or fewer."
2026 );
2027 }
2028}