1use std::{
5 any::Any,
6 convert::Infallible,
7 net::{SocketAddr, TcpStream},
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use async_graphql::{
13 Context, Data, Schema, SchemaBuilder,
14 extensions::{ApolloTracing, ExtensionFactory, Tracing},
15 http::ALL_WEBSOCKET_PROTOCOLS,
16};
17use async_graphql_axum::{GraphQLProtocol, GraphQLRequest, GraphQLResponse, GraphQLWebSocket};
18use axum::{
19 Extension, Router,
20 body::Body,
21 extract::{
22 ConnectInfo, FromRef, FromRequestParts, Query as AxumQuery, State, ws::WebSocketUpgrade,
23 },
24 http::{HeaderMap, StatusCode},
25 middleware::{self},
26 response::{IntoResponse, Response as AxumResponse},
27 routing::{MethodRouter, Route, get, post},
28};
29use axum_extra::{TypedHeader, headers::ContentLength};
30use chrono::Utc;
31use http::{HeaderValue, Method, Request};
32use iota_graphql_rpc_headers::LIMITS_HEADER;
33use iota_grpc_client::Client as GrpcClient;
34use iota_indexer::{
35 apis::{OptimisticWriteApi, ReadApi, WriteApi},
36 db::{get_pool_connection, setup_postgres::check_db_migration_consistency},
37 metrics::IndexerMetrics,
38 optimistic_indexing::OptimisticTransactionExecutor,
39 read::IndexerReader,
40 store::PgIndexerStore,
41};
42use iota_metrics::spawn_monitored_task;
43use iota_network_stack::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler};
44use iota_package_resolver::{PackageStoreWithLruCache, Resolver};
45use tokio::{join, net::TcpListener, sync::OnceCell};
46use tokio_util::sync::CancellationToken;
47use tower::{Layer, Service};
48use tower_http::cors::{AllowOrigin, CorsLayer};
49use tracing::info;
50use uuid::Uuid;
51
52use crate::{
53 config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
54 context_data::db_data_provider::PgManager,
55 data::{
56 DataLoader, Db,
57 package_resolver::{DbPackageStore, PackageResolver},
58 },
59 error::Error,
60 extensions::{
61 directive_checker::DirectiveChecker,
62 feature_gate::FeatureGate,
63 logger::Logger,
64 query_limits_checker::{PayloadSize, QueryLimitsChecker, ShowUsage},
65 timeout::Timeout,
66 },
67 metrics::Metrics,
68 mutation::Mutation,
69 server::{
70 exchange_rates_task::TriggerExchangeRatesTask,
71 system_package_task::SystemPackageTask,
72 version::{check_version_middleware, set_version_middleware},
73 watermark_task::{Watermark, WatermarkLock, WatermarkTask},
74 },
75 types::{
76 chain_identifier::ChainIdentifierCache,
77 datatype::IMoveDatatype,
78 move_object::IMoveObject,
79 object::IObject,
80 owner::IOwner,
81 query::{IotaGraphQLSchema, Query},
82 subscription::{GraphQLStream, Subscription},
83 },
84};
85
86const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);
89const MAX_COMPLEXITY: usize = 6000;
91
92pub(crate) struct Server {
93 router: Router,
94 address: SocketAddr,
95 watermark_task: WatermarkTask,
96 system_package_task: SystemPackageTask,
97 trigger_exchange_rates_task: TriggerExchangeRatesTask,
98 state: AppState,
99}
100
101impl Server {
102 pub async fn run(mut self) -> Result<(), Error> {
106 get_or_init_server_start_time().await;
107
108 let watermark_task = {
112 info!("Starting watermark update task");
113 spawn_monitored_task!(async move {
114 self.watermark_task.run().await;
115 })
116 };
117
118 let system_package_task = {
121 info!("Starting system package task");
122 spawn_monitored_task!(async move {
123 self.system_package_task.run().await;
124 })
125 };
126
127 let trigger_exchange_rates_task = {
128 info!("Starting trigger exchange rates task");
129 spawn_monitored_task!(async move {
130 self.trigger_exchange_rates_task.run().await;
131 })
132 };
133
134 let server_task = {
135 info!("Starting graphql service");
136 let cancellation_token = self.state.cancellation_token.clone();
137 let address = self.address;
138 let router = self.router;
139 spawn_monitored_task!(async move {
140 axum::serve(
141 TcpListener::bind(address)
142 .await
143 .map_err(|e| Error::Internal(format!("listener bind failed: {e}")))?,
144 router.into_make_service_with_connect_info::<SocketAddr>(),
145 )
146 .with_graceful_shutdown(async move {
147 cancellation_token.cancelled().await;
148 info!("Shutdown signal received, terminating graphql service");
149 })
150 .await
151 .map_err(|e| Error::Internal(format!("Server run failed: {e}")))
152 })
153 };
154
155 let _ = join!(
159 watermark_task,
160 system_package_task,
161 trigger_exchange_rates_task,
162 server_task
163 );
164
165 Ok(())
166 }
167}
168
169pub(crate) struct ServerBuilder {
170 state: AppState,
171 schema: SchemaBuilder<Query, Mutation, Subscription>,
172 router: Option<Router>,
173 db_reader: Option<Db>,
174 resolver: Option<PackageResolver>,
175}
176
177#[derive(Clone)]
178pub(crate) struct AppState {
179 connection: ConnectionConfig,
180 service: ServiceConfig,
181 metrics: Metrics,
182 cancellation_token: CancellationToken,
183 pub version: Version,
184}
185
186impl AppState {
187 pub(crate) fn new(
188 connection: ConnectionConfig,
189 service: ServiceConfig,
190 metrics: Metrics,
191 cancellation_token: CancellationToken,
192 version: Version,
193 ) -> Self {
194 Self {
195 connection,
196 service,
197 metrics,
198 cancellation_token,
199 version,
200 }
201 }
202}
203
204impl FromRef<AppState> for ConnectionConfig {
205 fn from_ref(app_state: &AppState) -> ConnectionConfig {
206 app_state.connection.clone()
207 }
208}
209
210impl FromRef<AppState> for Metrics {
211 fn from_ref(app_state: &AppState) -> Metrics {
212 app_state.metrics.clone()
213 }
214}
215
216impl ServerBuilder {
217 pub fn new(state: AppState) -> Self {
218 Self {
219 state,
220 schema: schema_builder(),
221 router: None,
222 db_reader: None,
223 resolver: None,
224 }
225 }
226
227 pub fn address(&self) -> String {
228 format!(
229 "{}:{}",
230 self.state.connection.host, self.state.connection.port
231 )
232 }
233
234 pub fn context_data(mut self, context_data: impl Any + Send + Sync) -> Self {
235 self.schema = self.schema.data(context_data);
236 self
237 }
238
239 pub fn extension(mut self, extension: impl ExtensionFactory) -> Self {
240 self.schema = self.schema.extension(extension);
241 self
242 }
243
244 fn build_schema(self) -> Schema<Query, Mutation, Subscription> {
245 self.schema.finish()
246 }
247
248 fn build_components(
251 self,
252 ) -> (
253 String,
254 Schema<Query, Mutation, Subscription>,
255 Db,
256 PackageResolver,
257 Router,
258 ) {
259 let address = self.address();
260 let ServerBuilder {
261 state: _,
262 schema,
263 db_reader,
264 resolver,
265 router,
266 } = self;
267 (
268 address,
269 schema.finish(),
270 db_reader.expect("db reader not initialized"),
271 resolver.expect("package resolver not initialized"),
272 router.expect("router not initialized"),
273 )
274 }
275
276 fn init_router(&mut self) {
277 if self.router.is_none() {
278 let router: Router = Router::new()
279 .route("/", post(graphql_handler))
280 .route("/subscriptions", get(subscription_handler))
281 .route("/{version}", post(graphql_handler))
282 .route("/{version}/subscriptions", get(subscription_handler))
283 .route("/graphql", post(graphql_handler))
284 .route("/graphql/subscriptions", get(subscription_handler))
285 .route("/graphql/{version}", post(graphql_handler))
286 .route(
287 "/graphql/{version}/subscriptions",
288 get(subscription_handler),
289 )
290 .route("/health", get(health_check))
291 .route("/graphql/health", get(health_check))
292 .route("/graphql/{version}/health", get(health_check))
293 .with_state(self.state.clone())
294 .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
295 metrics: self.state.metrics.clone(),
296 }));
297 self.router = Some(router);
298 }
299 }
300
301 pub fn route(mut self, path: &str, method_handler: MethodRouter) -> Self {
302 self.init_router();
303 self.router = self.router.map(|router| router.route(path, method_handler));
304 self
305 }
306
307 pub fn layer<L>(mut self, layer: L) -> Self
308 where
309 L: Layer<Route> + Clone + Send + Sync + 'static,
310 L::Service: Service<Request<Body>> + Clone + Send + Sync + 'static,
311 <L::Service as Service<Request<Body>>>::Response: IntoResponse + 'static,
312 <L::Service as Service<Request<Body>>>::Error: Into<Infallible> + 'static,
313 <L::Service as Service<Request<Body>>>::Future: Send + 'static,
314 {
315 self.init_router();
316 self.router = self.router.map(|router| router.layer(layer));
317 self
318 }
319
320 fn cors() -> Result<CorsLayer, Error> {
321 let acl = match std::env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
322 Ok(value) => {
323 let allow_hosts = value
324 .split(',')
325 .map(HeaderValue::from_str)
326 .collect::<Result<Vec<_>, _>>()
327 .map_err(|_| {
328 Error::Internal(
329 "Cannot resolve access control origin env variable".to_string(),
330 )
331 })?;
332 AllowOrigin::list(allow_hosts)
333 }
334 _ => AllowOrigin::any(),
335 };
336 info!("Access control allow origin set to: {acl:?}");
337
338 let cors = CorsLayer::new()
339 .allow_methods([Method::POST, Method::GET])
341 .allow_origin(acl)
343 .allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
344 Ok(cors)
345 }
346
347 pub fn build(self) -> Result<Server, Error> {
349 let state = self.state.clone();
350 let (address, schema, db_reader, resolver, router) = self.build_components();
351
352 let watermark_task = WatermarkTask::new(
354 db_reader.clone(),
355 state.metrics.clone(),
356 std::time::Duration::from_millis(state.service.background_tasks.watermark_update_ms),
357 state.cancellation_token.clone(),
358 );
359
360 let system_package_task = SystemPackageTask::new(
361 resolver,
362 watermark_task.epoch_receiver(),
363 state.cancellation_token.clone(),
364 );
365
366 let trigger_exchange_rates_task = TriggerExchangeRatesTask::new(
367 db_reader,
368 watermark_task.epoch_receiver(),
369 state.cancellation_token.clone(),
370 );
371
372 let router = router
373 .route_layer(middleware::from_fn_with_state(
374 state.version,
375 set_version_middleware,
376 ))
377 .route_layer(middleware::from_fn_with_state(
378 state.version,
379 check_version_middleware,
380 ))
381 .layer(axum::extract::Extension(schema))
382 .layer(axum::extract::Extension(watermark_task.lock()))
383 .layer(Self::cors()?);
384
385 Ok(Server {
386 router,
387 address: address
388 .parse()
389 .map_err(|_| Error::Internal(format!("Failed to parse address {address}")))?,
390 watermark_task,
391 system_package_task,
392 trigger_exchange_rates_task,
393 state,
394 })
395 }
396
397 pub async fn from_config(
400 config: &ServerConfig,
401 version: &Version,
402 cancellation_token: CancellationToken,
403 ) -> Result<Self, Error> {
404 let prom_addr: SocketAddr = format!(
406 "{}:{}",
407 config.connection.prom_host, config.connection.prom_port
408 )
409 .parse()
410 .map_err(|_| {
411 Error::Internal(format!(
412 "Failed to parse url {}, port {} into socket address",
413 config.connection.prom_host, config.connection.prom_port
414 ))
415 })?;
416
417 let registry_service = iota_metrics::start_prometheus_server(prom_addr);
418 info!("Starting Prometheus HTTP endpoint at {}", prom_addr);
419 let registry = registry_service.default_registry();
420 registry
421 .register(iota_metrics::uptime_metric(
422 "graphql",
423 version.full,
424 "unknown",
425 ))
426 .unwrap();
427
428 let metrics = Metrics::new(®istry);
430 let indexer_metrics = IndexerMetrics::new(®istry);
431 let state = AppState::new(
432 config.connection.clone(),
433 config.service.clone(),
434 metrics.clone(),
435 cancellation_token.clone(),
436 *version,
437 );
438 let mut builder = ServerBuilder::new(state);
439
440 let iota_names_config = config.service.iota_names.clone();
441 let reader = PgManager::reader_with_config(
442 config.connection.db_url.clone(),
443 config.connection.db_pool_size,
444 config.service.limits.request_timeout_ms.into(),
448 indexer_metrics.clone(),
449 cancellation_token,
450 )
451 .map_err(|e| Error::ServerInit(format!("Failed to create pg connection pool: {e}")))?;
452
453 if !config.connection.skip_migration_consistency_check {
454 info!("Starting compatibility check");
456 let mut connection = get_pool_connection(&reader.get_pool())?;
457 check_db_migration_consistency(&mut connection)?;
458 info!("Compatibility check passed");
459 }
460
461 let db = Db::new(
463 reader.clone(),
464 config.service.limits.clone(),
465 metrics.clone(),
466 config.connection.max_available_range,
467 );
468 let loader = DataLoader::new(db.clone());
469 let pg_conn_pool = PgManager::new(reader.clone());
470 let package_store = DbPackageStore::new(loader.clone());
471 let resolver = Arc::new(Resolver::new_with_limits(
472 PackageStoreWithLruCache::new(package_store),
473 config.service.limits.package_resolver_limits(),
474 ));
475
476 builder.db_reader = Some(db.clone());
477 builder.resolver = Some(resolver.clone());
478
479 let Some(fullnode_url) = config.tx_exec_full_node.node_rpc_url.as_ref() else {
480 return Err(Error::ServerInit(
481 "No fullnode url found in config".to_string(),
482 ));
483 };
484
485 let graphql_streams = GraphQLStream::new(reader.clone(), ®istry).await?;
486
487 let fullnode_grpc_client = GrpcClient::new(fullnode_url)
488 .await
489 .map_err(|e| Error::ServerInit(e.to_string()))?;
490
491 let read_api = ReadApi::new(reader.clone(), fullnode_grpc_client.clone());
492 let write_api = build_write_api(fullnode_grpc_client, reader, indexer_metrics).await?;
493
494 builder = builder
495 .context_data(config.service.clone())
496 .context_data(loader)
497 .context_data(db)
498 .context_data(pg_conn_pool)
499 .context_data(resolver)
500 .context_data(write_api)
501 .context_data(read_api)
502 .context_data(iota_names_config)
503 .context_data(metrics.clone())
504 .context_data(config.clone())
505 .context_data(graphql_streams)
506 .context_data(ChainIdentifierCache::default());
507
508 if config.internal_features.feature_gate {
509 builder = builder.extension(FeatureGate);
510 }
511
512 if config.internal_features.logger {
513 builder = builder.extension(Logger::default());
514 }
515
516 if config.internal_features.query_limits_checker {
517 builder = builder.extension(QueryLimitsChecker);
518 }
519
520 if config.internal_features.directive_checker {
521 builder = builder.extension(DirectiveChecker);
522 }
523
524 if config.internal_features.query_timeout {
525 builder = builder.extension(Timeout);
526 }
527
528 if config.internal_features.tracing {
529 builder = builder.extension(Tracing);
530 }
531
532 if config.internal_features.apollo_tracing {
533 builder = builder.extension(ApolloTracing);
534 }
535
536 Ok(builder)
540 }
541}
542
543fn schema_builder() -> SchemaBuilder<Query, Mutation, Subscription> {
544 async_graphql::Schema::build(Query, Mutation, Subscription)
545 .limit_complexity(MAX_COMPLEXITY)
546 .register_output_type::<IMoveObject>()
547 .register_output_type::<IObject>()
548 .register_output_type::<IOwner>()
549 .register_output_type::<IMoveDatatype>()
550}
551
552pub fn export_schema() -> String {
554 schema_builder().finish().sdl()
555}
556
557async fn graphql_handler(
562 ConnectInfo(addr): ConnectInfo<SocketAddr>,
563 TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
564 schema: Extension<IotaGraphQLSchema>,
565 Extension(watermark_lock): Extension<WatermarkLock>,
566 headers: HeaderMap,
567 req: GraphQLRequest,
568) -> (axum::http::Extensions, GraphQLResponse) {
569 let mut req = req.into_inner();
570 req.data.insert(PayloadSize(content_length));
571 req.data.insert(Uuid::new_v4());
572 if headers.contains_key(ShowUsage::name()) {
573 req.data.insert(ShowUsage)
574 }
575
576 req.data.insert(addr);
580 req.data.insert(Watermark::new(watermark_lock).await);
581
582 let result = schema.execute(req).await;
583
584 let mut extensions = axum::http::Extensions::new();
587 if result.is_err() {
588 extensions.insert(GraphqlErrors(std::sync::Arc::new(result.errors.clone())));
589 };
590 (extensions, result.into())
591}
592
593pub(crate) fn get_write_api<'ctx>(
594 ctx: &'ctx Context<'_>,
595) -> Result<&'ctx OptimisticWriteApi, Error> {
596 ctx.data_opt()
597 .ok_or_else(|| Error::Internal("Unable to get node execution interface".to_string()))
598}
599
600async fn build_write_api(
601 fullnode_grpc_client: GrpcClient,
602 reader: IndexerReader,
603 metrics: IndexerMetrics,
604) -> Result<OptimisticWriteApi, Error> {
605 let indexer_store = PgIndexerStore::new(reader.get_pool(), metrics.clone());
606 let optimistic_tx_executor = OptimisticTransactionExecutor::new(
607 fullnode_grpc_client.clone(),
608 reader.clone(),
609 indexer_store,
610 metrics,
611 )
612 .await?;
613 Ok(OptimisticWriteApi::new(
614 WriteApi::new(fullnode_grpc_client, reader),
615 optimistic_tx_executor,
616 ))
617}
618
619async fn subscription_handler(
625 ConnectInfo(addr): ConnectInfo<SocketAddr>,
626 Extension(schema): Extension<IotaGraphQLSchema>,
627 req: http::Request<Body>,
628) -> AxumResponse {
629 let headers_contains_show_usage = req.headers().contains_key(ShowUsage::name());
630 let (mut parts, _body) = req.into_parts();
631
632 let protocol = match GraphQLProtocol::from_request_parts(&mut parts, &()).await {
634 Ok(protocol) => protocol,
635 Err(err) => return err.into_response(),
636 };
637
638 let upgrade = match WebSocketUpgrade::from_request_parts(&mut parts, &()).await {
640 Ok(upgrade) => upgrade,
641 Err(err) => return err.into_response(),
642 };
643
644 let resp = upgrade
645 .protocols(ALL_WEBSOCKET_PROTOCOLS)
646 .on_upgrade(move |stream| async move {
647 let mut connection_data = Data::default();
649 connection_data.insert(PayloadSize(0));
651 connection_data.insert(Uuid::new_v4());
652 if headers_contains_show_usage {
653 connection_data.insert(ShowUsage)
654 }
655 connection_data.insert(addr);
656
657 let connection =
658 GraphQLWebSocket::new(stream, schema, protocol).with_data(connection_data);
659 connection.serve().await;
660 });
661 resp
662}
663
664#[derive(Clone)]
665struct MetricsMakeCallbackHandler {
666 metrics: Metrics,
667}
668
669impl MakeCallbackHandler for MetricsMakeCallbackHandler {
670 type Handler = MetricsCallbackHandler;
671
672 fn make_handler(&self, _request: &http::request::Parts) -> Self::Handler {
673 let start = Instant::now();
674 let metrics = self.metrics.clone();
675
676 metrics.request_metrics.inflight_requests.inc();
677 metrics.inc_num_queries();
678
679 MetricsCallbackHandler { metrics, start }
680 }
681}
682
683struct MetricsCallbackHandler {
684 metrics: Metrics,
685 start: Instant,
686}
687
688impl ResponseHandler for MetricsCallbackHandler {
689 fn on_response(&mut self, response: &http::response::Parts) {
690 if let Some(errors) = response.extensions.get::<GraphqlErrors>() {
691 self.metrics.inc_errors(&errors.0);
692 }
693 }
694
695 fn on_error<E>(&mut self, _error: &E) {
696 }
701}
702
703impl Drop for MetricsCallbackHandler {
704 fn drop(&mut self) {
705 self.metrics.query_latency(self.start.elapsed());
706 self.metrics.request_metrics.inflight_requests.dec();
707 }
708}
709
710#[derive(Debug, Clone)]
711struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);
712
713async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
715 let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
716 return StatusCode::INTERNAL_SERVER_ERROR;
717 };
718
719 let Some(host) = url.host_str() else {
720 return StatusCode::INTERNAL_SERVER_ERROR;
721 };
722
723 let tcp_url = if let Some(port) = url.port() {
724 format!("{host}:{port}")
725 } else {
726 host.to_string()
727 };
728
729 if TcpStream::connect(tcp_url).is_err() {
730 StatusCode::INTERNAL_SERVER_ERROR
731 } else {
732 StatusCode::OK
733 }
734}
735
736#[derive(serde::Deserialize)]
737struct HealthParam {
738 max_checkpoint_lag_ms: Option<u64>,
739}
740
741async fn health_check(
747 State(connection): State<ConnectionConfig>,
748 Extension(watermark_lock): Extension<WatermarkLock>,
749 AxumQuery(query_params): AxumQuery<HealthParam>,
750) -> StatusCode {
751 let db_health_check = db_health_check(axum::extract::State(connection)).await;
752 if db_health_check != StatusCode::OK {
753 return db_health_check;
754 }
755
756 let max_checkpoint_lag_ms = query_params
757 .max_checkpoint_lag_ms
758 .map(Duration::from_millis)
759 .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);
760
761 let checkpoint_timestamp =
762 Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);
763
764 let now_millis = Utc::now().timestamp_millis();
765
766 let now: Duration = match u64::try_from(now_millis) {
768 Ok(val) => Duration::from_millis(val),
769 Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
770 };
771
772 if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
773 return StatusCode::GATEWAY_TIMEOUT;
774 }
775
776 db_health_check
777}
778
779async fn get_or_init_server_start_time() -> &'static Instant {
781 static ONCE: OnceCell<Instant> = OnceCell::const_new();
782 ONCE.get_or_init(|| async move { Instant::now() }).await
783}
784
785pub mod tests {
786 use std::{sync::Arc, time::Duration};
787
788 use async_graphql::{
789 Request, Response, Variables,
790 extensions::{Extension, ExtensionContext, NextExecute},
791 };
792 use iota_types::transaction::{TransactionData, TransactionDataAPI};
793 use serde_json::json;
794 use uuid::Uuid;
795
796 use super::*;
797 use crate::{
798 config::{ConnectionConfig, Limits, ServiceConfig, Version},
799 context_data::db_data_provider::PgManager,
800 extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
801 test_infra::cluster::Cluster,
802 };
803
804 fn prep_schema(
808 connection_config: Option<ConnectionConfig>,
809 service_config: Option<ServiceConfig>,
810 ) -> ServerBuilder {
811 let connection_config = connection_config.unwrap_or_default();
812 let service_config = service_config.unwrap_or_default();
813
814 let reader = PgManager::reader_with_config(
815 connection_config.db_url.clone(),
816 connection_config.db_pool_size,
817 service_config.limits.request_timeout_ms.into(),
818 IndexerMetrics::new(&prometheus::Registry::new()),
819 CancellationToken::new(),
820 )
821 .expect("failed to create pg connection pool");
822
823 let version = Version::for_testing();
824 let metrics = metrics();
825
826 let db = Db::new(
827 reader.clone(),
828 service_config.limits.clone(),
829 metrics.clone(),
830 connection_config.max_available_range,
831 );
832 let loader = DataLoader::new(db.clone());
833 let pg_conn_pool = PgManager::new(reader);
834 let cancellation_token = CancellationToken::new();
835 let watermark = Watermark {
836 checkpoint: 1,
837 checkpoint_timestamp_ms: 1,
838 epoch: 0,
839 };
840 let state = AppState::new(
841 connection_config,
842 service_config.clone(),
843 metrics.clone(),
844 cancellation_token,
845 version,
846 );
847 ServerBuilder::new(state)
848 .context_data(db)
849 .context_data(loader)
850 .context_data(pg_conn_pool)
851 .context_data(service_config)
852 .context_data(query_id())
853 .context_data(ip_address())
854 .context_data(watermark)
855 .context_data(ChainIdentifierCache::default())
856 .context_data(metrics)
857 }
858
859 fn metrics() -> Metrics {
860 let binding_address: SocketAddr = "0.0.0.0:9185".parse().unwrap();
861 let registry = iota_metrics::start_prometheus_server(binding_address).default_registry();
862 Metrics::new(®istry)
863 }
864
865 fn ip_address() -> SocketAddr {
866 let binding_address: SocketAddr = "0.0.0.0:51515".parse().unwrap();
867 binding_address
868 }
869
870 fn query_id() -> Uuid {
871 Uuid::new_v4()
872 }
873
874 pub async fn test_timeout_impl(cluster: &Cluster) {
875 struct TimedExecuteExt {
876 pub min_req_delay: Duration,
877 }
878
879 impl ExtensionFactory for TimedExecuteExt {
880 fn create(&self) -> Arc<dyn Extension> {
881 Arc::new(TimedExecuteExt {
882 min_req_delay: self.min_req_delay,
883 })
884 }
885 }
886
887 #[async_trait::async_trait]
888 impl Extension for TimedExecuteExt {
889 async fn execute(
890 &self,
891 ctx: &ExtensionContext<'_>,
892 operation_name: Option<&str>,
893 next: NextExecute<'_>,
894 ) -> Response {
895 tokio::time::sleep(self.min_req_delay).await;
896 next.run(ctx, operation_name).await
897 }
898 }
899
900 async fn test_timeout(
901 delay: Duration,
902 timeout: Duration,
903 query: &str,
904 write_api: OptimisticWriteApi,
905 ) -> Response {
906 let mut cfg = ServiceConfig::default();
907 cfg.limits.request_timeout_ms = timeout.as_millis() as u32;
908 cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32;
909
910 let schema = prep_schema(None, Some(cfg))
911 .context_data(write_api)
912 .extension(Timeout)
913 .extension(TimedExecuteExt {
914 min_req_delay: delay,
915 })
916 .build_schema();
917
918 schema.execute(query).await
919 }
920
921 let wallet = &cluster.validator_fullnode_handle.wallet;
922 let store = &cluster.indexer_store;
923 let fn_grpc_url = &cluster.validator_fullnode_handle.grpc_url();
924 let indexer_metrics = store.get_metrics();
925
926 let indexer_reader =
928 iota_indexer::read::IndexerReader::new_without_watermark_cache(store.blocking_cp());
929 let fullnode_gpc_client = GrpcClient::new(fn_grpc_url).await.unwrap();
930
931 let optimistic_tx_executor =
932 iota_indexer::optimistic_indexing::OptimisticTransactionExecutor::new(
933 fullnode_gpc_client.clone(),
934 indexer_reader.clone(),
935 store.clone(),
936 indexer_metrics,
937 )
938 .await
939 .unwrap();
940 let write_api = OptimisticWriteApi::new(
941 WriteApi::new(fullnode_gpc_client, indexer_reader),
942 optimistic_tx_executor,
943 );
944
945 let query = "{ chainIdentifier }";
946 let timeout = Duration::from_millis(1000);
947 let delay = Duration::from_millis(100);
948
949 test_timeout(delay, timeout, query, write_api.clone())
950 .await
951 .into_result()
952 .expect("should complete successfully");
953
954 let errs: Vec<_> = test_timeout(delay, delay, query, write_api.clone())
956 .await
957 .into_result()
958 .unwrap_err()
959 .into_iter()
960 .map(|e| e.message)
961 .collect();
962 let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
963 assert_eq!(errs, vec![exp]);
964
965 let addresses = wallet.get_addresses();
969 let gas = wallet
970 .get_one_gas_object_owned_by_address(addresses[0])
971 .await
972 .unwrap();
973 let tx_data = TransactionData::new_transfer_iota(
974 addresses[1],
975 addresses[0],
976 Some(1000),
977 gas.unwrap(),
978 1_000_000,
979 wallet.get_reference_gas_price().await.unwrap(),
980 );
981
982 let tx = wallet.sign_transaction(&tx_data);
983 let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
984
985 let signature_base64 = &signatures[0];
986 let query = format!(
987 r#"
988 mutation {{
989 executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
990 effects {{
991 status
992 }}
993 }}
994 }}"#,
995 tx_bytes.encoded(),
996 signature_base64.encoded()
997 );
998 let errs: Vec<_> = test_timeout(delay, delay, &query, write_api.clone())
999 .await
1000 .into_result()
1001 .unwrap_err()
1002 .into_iter()
1003 .map(|e| e.message)
1004 .collect();
1005 let exp = format!(
1006 "Mutation request timed out. Limit: {}s",
1007 delay.as_secs_f32()
1008 );
1009 assert_eq!(errs, vec![exp]);
1010 }
1011
1012 pub async fn test_query_depth_limit_impl() {
1013 async fn exec_query_depth_limit(depth: u32, query: &str) -> Response {
1014 let service_config = ServiceConfig {
1015 limits: Limits {
1016 max_query_depth: depth,
1017 ..Default::default()
1018 },
1019 ..Default::default()
1020 };
1021
1022 let schema = prep_schema(None, Some(service_config))
1023 .context_data(PayloadSize(100))
1024 .extension(QueryLimitsChecker)
1025 .build_schema();
1026 schema.execute(query).await
1027 }
1028
1029 exec_query_depth_limit(1, "{ chainIdentifier }")
1030 .await
1031 .into_result()
1032 .expect("should complete successfully");
1033
1034 exec_query_depth_limit(
1035 5,
1036 "{ chainIdentifier protocolConfig { configs { value key }} }",
1037 )
1038 .await
1039 .into_result()
1040 .expect("should complete successfully");
1041
1042 let errs: Vec<_> = exec_query_depth_limit(0, "{ chainIdentifier }")
1044 .await
1045 .into_result()
1046 .unwrap_err()
1047 .into_iter()
1048 .map(|e| e.message)
1049 .collect();
1050
1051 assert_eq!(errs, vec!["Query nesting is over 0".to_string()]);
1052 let errs: Vec<_> = exec_query_depth_limit(
1053 2,
1054 "{ chainIdentifier protocolConfig { configs { value key }} }",
1055 )
1056 .await
1057 .into_result()
1058 .unwrap_err()
1059 .into_iter()
1060 .map(|e| e.message)
1061 .collect();
1062 assert_eq!(errs, vec!["Query nesting is over 2".to_string()]);
1063 }
1064
1065 pub async fn test_query_node_limit_impl() {
1066 async fn exec_query_node_limit(nodes: u32, query: &str) -> Response {
1067 let service_config = ServiceConfig {
1068 limits: Limits {
1069 max_query_nodes: nodes,
1070 ..Default::default()
1071 },
1072 ..Default::default()
1073 };
1074
1075 let schema = prep_schema(None, Some(service_config))
1076 .context_data(PayloadSize(100))
1077 .extension(QueryLimitsChecker)
1078 .build_schema();
1079 schema.execute(query).await
1080 }
1081
1082 exec_query_node_limit(1, "{ chainIdentifier }")
1083 .await
1084 .into_result()
1085 .expect("should complete successfully");
1086
1087 exec_query_node_limit(
1088 5,
1089 "{ chainIdentifier protocolConfig { configs { value key }} }",
1090 )
1091 .await
1092 .into_result()
1093 .expect("should complete successfully");
1094
1095 let err: Vec<_> = exec_query_node_limit(0, "{ chainIdentifier }")
1097 .await
1098 .into_result()
1099 .unwrap_err()
1100 .into_iter()
1101 .map(|e| e.message)
1102 .collect();
1103 assert_eq!(err, vec!["Query has over 0 nodes".to_string()]);
1104
1105 let err: Vec<_> = exec_query_node_limit(
1106 4,
1107 "{ chainIdentifier protocolConfig { configs { value key }} }",
1108 )
1109 .await
1110 .into_result()
1111 .unwrap_err()
1112 .into_iter()
1113 .map(|e| e.message)
1114 .collect();
1115 assert_eq!(err, vec!["Query has over 4 nodes".to_string()]);
1116 }
1117
1118 pub async fn test_query_default_page_limit_impl(connection_config: ConnectionConfig) {
1119 let service_config = ServiceConfig {
1120 limits: Limits {
1121 default_page_size: 1,
1122 ..Default::default()
1123 },
1124 ..Default::default()
1125 };
1126 let schema = prep_schema(Some(connection_config), Some(service_config)).build_schema();
1127
1128 let resp = schema
1129 .execute("{ checkpoints { nodes { sequenceNumber } } }")
1130 .await;
1131 let data = resp.data.clone().into_json().unwrap();
1132 let checkpoints = data
1133 .get("checkpoints")
1134 .unwrap()
1135 .get("nodes")
1136 .unwrap()
1137 .as_array()
1138 .unwrap();
1139 assert_eq!(
1140 checkpoints.len(),
1141 1,
1142 "Checkpoints should have exactly one element"
1143 );
1144
1145 let resp = schema
1146 .execute("{ checkpoints(first: 2) { nodes { sequenceNumber } } }")
1147 .await;
1148 let data = resp.data.into_json().unwrap();
1149 let checkpoints = data
1150 .get("checkpoints")
1151 .unwrap()
1152 .get("nodes")
1153 .unwrap()
1154 .as_array()
1155 .unwrap();
1156 assert_eq!(
1157 checkpoints.len(),
1158 2,
1159 "Checkpoints should return two elements"
1160 );
1161 }
1162
1163 pub async fn test_query_max_page_limit_impl() {
1164 let schema = prep_schema(None, None).build_schema();
1165
1166 schema
1167 .execute("{ objects(first: 1) { nodes { version } } }")
1168 .await
1169 .into_result()
1170 .expect("should complete successfully");
1171
1172 let err: Vec<_> = schema
1174 .execute("{ objects(first: 51) { nodes { version } } }")
1175 .await
1176 .into_result()
1177 .unwrap_err()
1178 .into_iter()
1179 .map(|e| e.message)
1180 .collect();
1181 assert_eq!(
1182 err,
1183 vec!["Connection's page size of 51 exceeds max of 50".to_string()]
1184 );
1185 }
1186
1187 pub async fn test_query_complexity_metrics_impl() {
1188 let server_builder = prep_schema(None, None).context_data(PayloadSize(100));
1189 let metrics = server_builder.state.metrics.clone();
1190 let schema = server_builder
1191 .extension(QueryLimitsChecker) .build_schema();
1193
1194 schema
1195 .execute("{ chainIdentifier }")
1196 .await
1197 .into_result()
1198 .expect("should complete successfully");
1199
1200 let req_metrics = metrics.request_metrics;
1201 assert_eq!(req_metrics.input_nodes.get_sample_count(), 1);
1202 assert_eq!(req_metrics.output_nodes.get_sample_count(), 1);
1203 assert_eq!(req_metrics.query_depth.get_sample_count(), 1);
1204 assert_eq!(req_metrics.input_nodes.get_sample_sum(), 1.);
1205 assert_eq!(req_metrics.output_nodes.get_sample_sum(), 1.);
1206 assert_eq!(req_metrics.query_depth.get_sample_sum(), 1.);
1207
1208 schema
1209 .execute("{ chainIdentifier protocolConfig { configs { value key }} }")
1210 .await
1211 .into_result()
1212 .expect("should complete successfully");
1213
1214 assert_eq!(req_metrics.input_nodes.get_sample_count(), 2);
1215 assert_eq!(req_metrics.output_nodes.get_sample_count(), 2);
1216 assert_eq!(req_metrics.query_depth.get_sample_count(), 2);
1217 assert_eq!(req_metrics.input_nodes.get_sample_sum(), 2. + 4.);
1218 assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
1219 assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
1220 }
1221
1222 pub async fn test_health_check_impl() {
1223 let server_builder = prep_schema(None, None);
1224 let url = format!(
1225 "http://{}:{}/health",
1226 server_builder.state.connection.host, server_builder.state.connection.port
1227 );
1228 server_builder.build_schema();
1229
1230 let resp = reqwest::get(&url).await.unwrap();
1231 assert_eq!(resp.status(), reqwest::StatusCode::OK);
1232
1233 let url_with_param = format!("{url}?max_checkpoint_lag_ms=1");
1234 let resp = reqwest::get(&url_with_param).await.unwrap();
1235 assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT);
1236 }
1237
1238 async fn execute_for_error(limits: Limits, request: Request) -> String {
1241 let service_config = ServiceConfig {
1242 limits,
1243 ..Default::default()
1244 };
1245
1246 let schema = prep_schema(None, Some(service_config))
1247 .context_data(PayloadSize(
1248 serde_json::to_string(&request).unwrap().len() as u64,
1253 ))
1254 .extension(QueryLimitsChecker)
1255 .build_schema();
1256
1257 let errs: Vec<_> = schema
1258 .execute(request)
1259 .await
1260 .into_result()
1261 .unwrap_err()
1262 .into_iter()
1263 .map(|e| e.message)
1264 .collect();
1265
1266 errs.join("\n")
1267 }
1268
1269 pub async fn test_payload_read_exceeded_impl() {
1270 assert_eq!(
1271 execute_for_error(
1272 Limits {
1273 max_tx_payload_size: 400,
1274 max_query_payload_size: 10,
1275 ..Default::default()
1276 },
1277 r#"
1278 mutation {
1279 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1280 effects {
1281 status
1282 }
1283 }
1284 }
1285 "#
1286 .into()
1287 )
1288 .await,
1289 "Query part too large: 354 bytes. Requests are limited to 400 bytes or fewer on \
1290 transaction payloads (all inputs to executeTransactionBlock or \
1291 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 \
1292 bytes or fewer."
1293 );
1294 }
1295
1296 pub async fn test_payload_mutation_exceeded_impl() {
1297 assert_eq!(
1298 execute_for_error(
1299 Limits {
1300 max_tx_payload_size: 10,
1301 max_query_payload_size: 400,
1302 ..Default::default()
1303 },
1304 r#"
1305 mutation {
1306 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1307 effects {
1308 status
1309 }
1310 }
1311 }
1312 "#
1313 .into()
1314 )
1315 .await,
1316 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1317 transaction payloads (all inputs to executeTransactionBlock or \
1318 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1319 bytes or fewer."
1320 );
1321 }
1322
1323 pub async fn test_payload_dry_run_exceeded_impl() {
1324 assert_eq!(
1325 execute_for_error(
1326 Limits {
1327 max_tx_payload_size: 10,
1328 max_query_payload_size: 400,
1329 ..Default::default()
1330 },
1331 r#"
1332 query {
1333 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1334 error
1335 transaction {
1336 digest
1337 }
1338 }
1339 }
1340 "#
1341 .into(),
1342 )
1343 .await,
1344 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1345 transaction payloads (all inputs to executeTransactionBlock or \
1346 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 bytes \
1347 or fewer."
1348 );
1349 }
1350
1351 pub async fn test_payload_total_exceeded_impl() {
1352 assert_eq!(
1353 execute_for_error(
1354 Limits {
1355 max_tx_payload_size: 10,
1356 max_query_payload_size: 10,
1357 ..Default::default()
1358 },
1359 r#"
1360 query {
1361 dryRunTransactionBlock(txByte: "AAABBB") {
1362 error
1363 transaction {
1364 digest
1365 }
1366 }
1367 }
1368 "#
1369 .into(),
1370 )
1371 .await,
1372 "Overall request too large: 380 bytes. Requests are limited to 10 bytes or fewer on \
1373 transaction payloads (all inputs to executeTransactionBlock or dryRunTransactionBlock) \
1374 and the rest of the request (the query part) must be 10 bytes or fewer."
1375 );
1376 }
1377
1378 pub async fn test_payload_using_vars_mutation_exceeded_impl() {
1379 assert_eq!(
1380 execute_for_error(
1381 Limits {
1382 max_tx_payload_size: 10,
1383 max_query_payload_size: 500,
1384 ..Default::default()
1385 },
1386 Request::new(
1387 r#"
1388 mutation ($tx: String!, $sigs: [String!]!) {
1389 executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1390 effects {
1391 status
1392 }
1393 }
1394 }
1395 "#
1396 )
1397 .variables(Variables::from_json(json!({
1398 "tx": "AAABBBCCC",
1399 "sigs": ["BBB"]
1400 })))
1401 )
1402 .await,
1403 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1404 transaction payloads (all inputs to executeTransactionBlock or \
1405 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1406 bytes or fewer."
1407 );
1408 }
1409
1410 pub async fn test_payload_using_vars_read_exceeded_impl() {
1411 assert_eq!(
1412 execute_for_error(
1413 Limits {
1414 max_tx_payload_size: 500,
1415 max_query_payload_size: 10,
1416 ..Default::default()
1417 },
1418 Request::new(
1419 r#"
1420 mutation ($tx: String!, $sigs: [String!]!) {
1421 executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1422 effects {
1423 status
1424 }
1425 }
1426 }
1427 "#
1428 )
1429 .variables(Variables::from_json(json!({
1430 "tx": "AAA",
1431 "sigs": ["BBB"]
1432 })))
1433 )
1434 .await,
1435 "Query part too large: 409 bytes. Requests are limited to 500 bytes or fewer on \
1436 transaction payloads (all inputs to executeTransactionBlock or \
1437 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1438 or fewer."
1439 );
1440 }
1441
1442 pub async fn test_payload_using_vars_dry_run_exceeded_impl() {
1443 assert_eq!(
1444 execute_for_error(
1445 Limits {
1446 max_tx_payload_size: 10,
1447 max_query_payload_size: 400,
1448 ..Default::default()
1449 },
1450 Request::new(
1451 r#"
1452 query ($tx: String!) {
1453 dryRunTransactionBlock(txBytes: $tx) {
1454 error
1455 transaction {
1456 digest
1457 }
1458 }
1459 }
1460 "#
1461 )
1462 .variables(Variables::from_json(json!({
1463 "tx": "AAABBBCCC"
1464 }))),
1465 )
1466 .await,
1467 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1468 transaction payloads (all inputs to executeTransactionBlock or \
1469 dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1470 bytes or fewer."
1471 );
1472 }
1473
1474 pub async fn test_payload_using_vars_dry_run_read_exceeded_impl() {
1475 assert_eq!(
1476 execute_for_error(
1477 Limits {
1478 max_tx_payload_size: 400,
1479 max_query_payload_size: 10,
1480 ..Default::default()
1481 },
1482 Request::new(
1483 r#"
1484 query ($tx: String!) {
1485 dryRunTransactionBlock(txBytes: $tx) {
1486 error
1487 transaction {
1488 digest
1489 }
1490 }
1491 }
1492 "#
1493 )
1494 .variables(Variables::from_json(json!({
1495 "tx": "AAABBBCCC"
1496 }))),
1497 )
1498 .await,
1499 "Query part too large: 398 bytes. Requests are limited to 400 bytes or fewer on \
1500 transaction payloads (all inputs to executeTransactionBlock or \
1501 dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1502 or fewer."
1503 );
1504 }
1505
1506 pub async fn test_payload_multiple_execution_exceeded_impl() {
1507 let err = execute_for_error(
1510 Limits {
1511 max_tx_payload_size: 30,
1512 max_query_payload_size: 320,
1513 ..Default::default()
1514 },
1515 r#"
1516 mutation {
1517 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1518 effects {
1519 status
1520 }
1521 }
1522 }
1523 "#
1524 .into(),
1525 )
1526 .await;
1527 assert!(err.starts_with("Query part too large"), "{err}");
1528
1529 assert_eq!(
1530 execute_for_error(
1531 Limits {
1532 max_tx_payload_size: 30,
1533 max_query_payload_size: 800,
1534 ..Default::default()
1535 },
1536 r#"
1537 mutation {
1538 e0: executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1539 effects {
1540 status
1541 }
1542 }
1543 e1: executeTransactionBlock(txBytes: "EEEFFFGGG", signatures: ["HHH"]) {
1544 effects {
1545 status
1546 }
1547 }
1548 }
1549 "#
1550 .into()
1551 )
1552 .await,
1553 "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1554 transaction payloads (all inputs to executeTransactionBlock or \
1555 dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1556 bytes or fewer."
1557 );
1558 }
1559
1560 pub async fn test_payload_multiple_dry_run_exceeded_impl() {
1561 let err = execute_for_error(
1564 Limits {
1565 max_tx_payload_size: 20,
1566 max_query_payload_size: 330,
1567 ..Default::default()
1568 },
1569 r#"
1570 query {
1571 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1572 error
1573 transaction {
1574 digest
1575 }
1576 }
1577 }
1578 "#
1579 .into(),
1580 )
1581 .await;
1582 assert!(err.starts_with("Query part too large"), "{err}");
1583
1584 assert_eq!(
1585 execute_for_error(
1586 Limits {
1587 max_tx_payload_size: 20,
1588 max_query_payload_size: 800,
1589 ..Default::default()
1590 },
1591 r#"
1592 query {
1593 d0: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1594 error
1595 transaction {
1596 digest
1597 }
1598 }
1599 d1: dryRunTransactionBlock(txBytes: "DDDEEEFFF") {
1600 error
1601 transaction {
1602 digest
1603 }
1604 }
1605 }
1606 "#
1607 .into()
1608 )
1609 .await,
1610 "Transaction payload too large. Requests are limited to 20 bytes or fewer on \
1611 transaction payloads (all inputs to executeTransactionBlock or \
1612 dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1613 bytes or fewer."
1614 );
1615 }
1616
1617 pub async fn test_payload_execution_multiple_sigs_exceeded_impl() {
1618 let err = execute_for_error(
1621 Limits {
1622 max_tx_payload_size: 30,
1623 max_query_payload_size: 320,
1624 ..Default::default()
1625 },
1626 r#"
1627 mutation {
1628 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1629 effects {
1630 status
1631 }
1632 }
1633 }
1634 "#
1635 .into(),
1636 )
1637 .await;
1638
1639 assert!(err.starts_with("Query part too large"), "{err}");
1640
1641 assert_eq!(
1642 execute_for_error(
1643 Limits {
1644 max_tx_payload_size: 30,
1645 max_query_payload_size: 500,
1646 ..Default::default()
1647 },
1648 r#"
1649 mutation {
1650 executeTransactionBlock(
1651 txBytes: "AAA",
1652 signatures: ["BBB", "CCC", "DDD", "EEE", "FFF"]
1653 ) {
1654 effects {
1655 status
1656 }
1657 }
1658 }
1659 "#
1660 .into(),
1661 )
1662 .await,
1663 "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1664 transaction payloads (all inputs to executeTransactionBlock or \
1665 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1666 bytes or fewer.",
1667 )
1668 }
1669
1670 pub async fn test_payload_sig_var_execution_exceeded_impl() {
1671 assert_eq!(
1674 execute_for_error(
1675 Limits {
1676 max_tx_payload_size: 10,
1677 max_query_payload_size: 500,
1678 ..Default::default()
1679 },
1680 Request::new(
1681 r#"
1682 mutation ($tx: String!, $sig: String!) {
1683 executeTransactionBlock(txBytes: $tx, signatures: [$sig]) {
1684 effects {
1685 status
1686 }
1687 }
1688 }
1689 "#
1690 )
1691 .variables(Variables::from_json(json!({
1692 "tx": "AAA",
1693 "sig": "BBB"
1694 })))
1695 )
1696 .await,
1697 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1698 transaction payloads (all inputs to executeTransactionBlock or \
1699 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1700 bytes or fewer."
1701 );
1702 }
1703
1704 fn passed_tx_checks(err: &str) -> bool {
1707 !err.starts_with("Overall request too large")
1708 && !err.starts_with("Transaction payload too large")
1709 }
1710
1711 pub async fn test_payload_reusing_vars_execution_impl() {
1712 assert!(!passed_tx_checks(
1718 &execute_for_error(
1719 Limits {
1720 max_tx_payload_size: 1,
1721 max_query_payload_size: 1,
1722 ..Default::default()
1723 },
1724 r#"
1725 mutation {
1726 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1727 effects {
1728 status
1729 }
1730 }
1731 }
1732 "#
1733 .into()
1734 )
1735 .await
1736 ));
1737
1738 let limits = Limits {
1739 max_tx_payload_size: 20,
1740 max_query_payload_size: 1000,
1741 ..Default::default()
1742 };
1743
1744 assert!(passed_tx_checks(
1747 &execute_for_error(
1748 limits.clone(),
1749 Request::new(
1750 r#"
1751 mutation ($sig: String!) {
1752 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig]) {
1753 effects {
1754 status
1755 }
1756 }
1757 }
1758 "#,
1759 )
1760 .variables(Variables::from_json(json!({
1761 "sig": "BBB"
1762 })))
1763 )
1764 .await
1765 ));
1766
1767 let execution_result = execute_for_error(
1770 limits.clone(),
1771 Request::new(
1772 r#"
1773 mutation ($sig: String!) {
1774 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, "BBB"]) {
1775 effects {
1776 status
1777 }
1778 }
1779 }
1780 "#,
1781 )
1782 .variables(Variables::from_json(json!({
1783 "sig": "BBB"
1784 }))),
1785 )
1786 .await;
1787 assert!(!passed_tx_checks(&execution_result), "{execution_result}");
1788
1789 let execution_result = execute_for_error(
1792 limits,
1793 Request::new(
1794 r#"
1795 mutation ($sig: String!) {
1796 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, $sig]) {
1797 effects {
1798 status
1799 }
1800 }
1801 }
1802 "#,
1803 )
1804 .variables(Variables::from_json(json!({
1805 "sig": "BBB"
1806 }))),
1807 )
1808 .await;
1809 assert!(passed_tx_checks(&execution_result), "{execution_result}");
1810 }
1811
1812 pub async fn test_payload_reusing_vars_dry_run_impl() {
1813 let limits = Limits {
1817 max_tx_payload_size: 20,
1818 max_query_payload_size: 1000,
1819 ..Default::default()
1820 };
1821
1822 assert!(passed_tx_checks(
1824 &execute_for_error(
1825 limits.clone(),
1826 Request::new(
1827 r#"
1828 query ($tx: String!) {
1829 dryRunTransactionBlock(txBytes: $tx) {
1830 error
1831 transaction {
1832 digest
1833 }
1834 }
1835 }
1836 "#,
1837 )
1838 .variables(Variables::from_json(json!({
1839 "tx": "AAABBBCCC"
1840 })))
1841 )
1842 .await
1843 ));
1844
1845 assert!(!passed_tx_checks(
1847 &execute_for_error(
1848 limits.clone(),
1849 Request::new(
1850 r#"
1851 query ($tx: String!) {
1852 d0: dryRunTransactionBlock(txBytes: $tx) {
1853 error
1854 transaction {
1855 digest
1856 }
1857 }
1858
1859 d1: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1860 error
1861 transaction {
1862 digest
1863 }
1864 }
1865 }
1866 "#,
1867 )
1868 .variables(Variables::from_json(json!({
1869 "tx": "AAABBBCCC"
1870 })))
1871 )
1872 .await
1873 ));
1874
1875 assert!(passed_tx_checks(
1877 &execute_for_error(
1878 limits,
1879 Request::new(
1880 r#"
1881 query ($tx: String!) {
1882 d0: dryRunTransactionBlock(txBytes: $tx) {
1883 error
1884 transaction {
1885 digest
1886 }
1887 }
1888
1889 d1: dryRunTransactionBlock(txBytes: $tx) {
1890 error
1891 transaction {
1892 digest
1893 }
1894 }
1895 }
1896 "#,
1897 )
1898 .variables(Variables::from_json(json!({
1899 "tx": "AAABBBCCC"
1900 })))
1901 )
1902 .await
1903 ));
1904 }
1905
1906 pub async fn test_payload_named_fragment_execution_exceeded_impl() {
1907 assert_eq!(
1908 execute_for_error(
1909 Limits {
1910 max_tx_payload_size: 10,
1911 max_query_payload_size: 500,
1912 ..Default::default()
1913 },
1914 r#"
1915 mutation {
1916 ...Tx
1917 }
1918
1919 fragment Tx on Mutation {
1920 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1921 effects {
1922 status
1923 }
1924 }
1925 }
1926 "#
1927 .into()
1928 )
1929 .await,
1930 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1931 transaction payloads (all inputs to executeTransactionBlock or \
1932 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1933 bytes or fewer."
1934 );
1935 }
1936
1937 pub async fn test_payload_inline_fragment_execution_exceeded_impl() {
1938 assert_eq!(
1939 execute_for_error(
1940 Limits {
1941 max_tx_payload_size: 10,
1942 max_query_payload_size: 500,
1943 ..Default::default()
1944 },
1945 r#"
1946 mutation {
1947 ... on Mutation {
1948 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1949 effects {
1950 status
1951 }
1952 }
1953 }
1954 }
1955 "#
1956 .into()
1957 )
1958 .await,
1959 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1960 transaction payloads (all inputs to executeTransactionBlock or \
1961 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1962 bytes or fewer."
1963 );
1964 }
1965
1966 pub async fn test_payload_named_fragment_dry_run_exceeded_impl() {
1967 assert_eq!(
1968 execute_for_error(
1969 Limits {
1970 max_tx_payload_size: 10,
1971 max_query_payload_size: 500,
1972 ..Default::default()
1973 },
1974 r#"
1975 query {
1976 ...DryRun
1977 }
1978
1979 fragment DryRun on Query {
1980 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1981 error
1982 transaction {
1983 digest
1984 }
1985 }
1986 }
1987 "#
1988 .into(),
1989 )
1990 .await,
1991 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1992 transaction payloads (all inputs to executeTransactionBlock or \
1993 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1994 bytes or fewer."
1995 );
1996 }
1997
1998 pub async fn test_payload_inline_fragment_dry_run_exceeded_impl() {
1999 assert_eq!(
2000 execute_for_error(
2001 Limits {
2002 max_tx_payload_size: 10,
2003 max_query_payload_size: 500,
2004 ..Default::default()
2005 },
2006 r#"
2007 query {
2008 ... on Query {
2009 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2010 error
2011 transaction {
2012 digest
2013 }
2014 }
2015 }
2016 }
2017 "#
2018 .into(),
2019 )
2020 .await,
2021 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2022 transaction payloads (all inputs to executeTransactionBlock or \
2023 dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
2024 bytes or fewer."
2025 );
2026 }
2027
2028 pub async fn test_payload_object_input_dry_run_exceeded_impl() {
2029 assert_eq!(
2034 execute_for_error(
2035 Limits {
2036 max_tx_payload_size: 500,
2037 max_query_payload_size: 10_000,
2038 ..Default::default()
2039 },
2040 Request::new(
2041 r#"
2042 query DryRunQuery(
2043 $txBytes: String!,
2044 $skipChecks: Boolean!,
2045 $txMeta: TransactionMetadata
2046 ) {
2047 dryRunTransactionBlock(
2048 txBytes: $txBytes,
2049 skipChecks: $skipChecks,
2050 txMeta: $txMeta
2051 ) {
2052 error
2053 transaction {
2054 digest
2055 }
2056 }
2057 }
2058 "#,
2059 )
2060 .variables(Variables::from_json(json!({
2061 "txBytes": "AAIAINdb/wdoZKClk3YWgTp2SRpvVYkX9XUMu0Im6dGMv5g1AAjoAwAAAAAAAAICAAEBAQABAQIAAAEAAA==",
2062 "skipChecks": false,
2063 "txMeta": {
2064 "gasBudget": null,
2065 "gasPrice": 1000,
2066 "gasSponsor": "0x2c4a2ee4a1cb67c2ae7a6215784a26bb9c5051ea3f9aa5f60ccc58613a94f70b",
2067 "sender": "0x2c4a2ee4a1cb67c2ae7a6215784a26bb9c5051ea3f9aa5f60ccc58613a94f70b",
2068 "gasObjects": [
2069 { "address": "0x0f49b390361c7b85f50ea436dd2f78d07794450162fa543d9b37d4fd5d3c9884", "digest": "3Hij17jtvK4o7VxarnBJkX31LYLtV7KDecLBhmBJuapU", "version": 3 },
2070 { "address": "0x115d51e2df3dae3fcb4a8d71ea9e357cec509e2f8e2f604eea53bb7abcd68c2b", "digest": "9t4C9FXKFdpgpZa8L968KtgkCPHwRA2QFHz6gVrymVJu", "version": 3 },
2071 { "address": "0x18c9d0de646c02aaea280eb4aafabbc6855cf6efb7c423714428318c42fb35be", "digest": "FguxMqj4pLitqCQLhGrEja65HT5r7749h8CPd3JzSnnk", "version": 3 }
2072 ]
2073 }
2074 })))
2075 )
2076 .await,
2077 "Transaction payload too large. Requests are limited to 500 bytes or fewer on \
2078 transaction payloads (all inputs to executeTransactionBlock or \
2079 dryRunTransactionBlock) and the rest of the request (the query part) must be 10000 \
2080 bytes or fewer."
2081 );
2082 }
2083}