iota_graphql_rpc/extensions/
logger.rs1use std::{
6 fmt::Write,
7 net::SocketAddr,
8 sync::{
9 Arc,
10 atomic::{AtomicBool, Ordering},
11 },
12};
13
14use async_graphql::{
15 PathSegment, Response, ServerError, ServerResult, ValidationResult, Variables,
16 extensions::{
17 Extension, ExtensionContext, ExtensionFactory, NextExecute, NextParseQuery, NextResolve,
18 NextSubscribe, NextValidation, ResolveInfo,
19 },
20 parser::types::{ExecutableDocument, OperationType, Selection},
21};
22use async_graphql_value::ConstValue;
23use futures::stream::BoxStream;
24use tracing::{debug, error, info, warn};
25use uuid::Uuid;
26
27use crate::{error::code, metrics::Metrics};
28
29#[derive(Clone, Debug)]
30pub struct LoggerConfig {
31 pub log_request_query: bool,
32 pub log_response: bool,
33 pub log_complexity: bool,
34}
35
36impl Default for LoggerConfig {
37 fn default() -> Self {
38 Self {
39 log_request_query: false,
40 log_response: true,
41 log_complexity: true,
42 }
43 }
44}
45
46#[derive(Clone, Debug, Default)]
47pub struct Logger {
48 config: LoggerConfig,
49}
50
51impl ExtensionFactory for Logger {
52 fn create(&self) -> Arc<dyn Extension> {
53 Arc::new(LoggerExtension {
54 config: self.config.clone(),
55 is_subscription: AtomicBool::default(),
56 })
57 }
58}
59
60struct LoggerExtension {
61 config: LoggerConfig,
62 is_subscription: AtomicBool,
64}
65
66#[async_trait::async_trait]
67impl Extension for LoggerExtension {
68 fn subscribe<'s>(
69 &self,
70 ctx: &ExtensionContext<'_>,
71 stream: BoxStream<'s, Response>,
72 next: NextSubscribe<'_>,
73 ) -> BoxStream<'s, Response> {
74 self.is_subscription.store(true, Ordering::Relaxed);
77 next.run(ctx, stream)
78 }
79
80 async fn resolve(
83 &self,
84 ctx: &ExtensionContext<'_>,
85 info: ResolveInfo<'_>,
86 next: NextResolve<'_>,
87 ) -> ServerResult<Option<ConstValue>> {
88 if info.path_node.parent.is_none() {
89 ctx.data_unchecked::<Metrics>()
90 .request_metrics
91 .num_queries_top_level
92 .with_label_values(&[info.name])
93 .inc();
94 }
95 next.run(ctx, info).await
96 }
97
98 async fn parse_query(
99 &self,
100 ctx: &ExtensionContext<'_>,
101 query: &str,
102 variables: &Variables,
103 next: NextParseQuery<'_>,
104 ) -> ServerResult<ExecutableDocument> {
105 let document = next.run(ctx, query, variables).await?;
106 let is_schema = document
107 .operations
108 .iter()
109 .filter(|(_, operation)| operation.node.ty == OperationType::Query)
110 .any(|(_, operation)| operation.node.selection_set.node.items.iter().any(|selection| matches!(&selection.node, Selection::Field(field) if field.node.name.node == "__schema")));
111 let query_id: &Uuid = ctx.data_unchecked();
112 let session_id: &SocketAddr = ctx.data_unchecked();
113 if !is_schema && self.config.log_request_query {
114 info!(
115 %query_id,
116 %session_id,
117 "[Query] {}",
118 ctx.stringify_execute_doc(&document, variables)
119 );
120 }
121 Ok(document)
122 }
123
124 async fn validation(
125 &self,
126 ctx: &ExtensionContext<'_>,
127 next: NextValidation<'_>,
128 ) -> Result<ValidationResult, Vec<ServerError>> {
129 let res = next.run(ctx).await?;
130 let query_id: &Uuid = ctx.data_unchecked();
131 let session_id: &SocketAddr = ctx.data_unchecked();
132 if self.config.log_complexity {
133 info!(
134 %query_id,
135 %session_id,
136 complexity = res.complexity,
137 depth = res.depth,
138 "[Validation]",
139 );
140 }
141 Ok(res)
142 }
143
144 async fn execute(
145 &self,
146 ctx: &ExtensionContext<'_>,
147 operation_name: Option<&str>,
148 next: NextExecute<'_>,
149 ) -> Response {
150 let resp = next.run(ctx, operation_name).await;
151 let query_id: &Uuid = ctx.data_unchecked();
152 let session_id: &SocketAddr = ctx.data_unchecked();
153 if resp.is_err() {
154 for err in &resp.errors {
155 let error_code = &err.extensions.as_ref().and_then(|x| x.get("code"));
156 if !err.path.is_empty() {
157 let mut path = String::new();
158 for (idx, s) in err.path.iter().enumerate() {
159 if idx > 0 {
160 path.push('.');
161 }
162 match s {
163 PathSegment::Index(idx) => {
164 let _ = write!(&mut path, "{idx}");
165 }
166 PathSegment::Field(name) => {
167 let _ = write!(&mut path, "{name}");
168 }
169 }
170 }
171
172 if let Some(async_graphql_value::ConstValue::String(error_code)) = error_code {
173 if error_code.as_str() == code::INTERNAL_SERVER_ERROR {
174 error!(
175 %query_id,
176 %session_id,
177 error_code,
178 "[Response] path={} message={}",
179 path,
180 err.message,
181 );
182 } else {
183 info!(
184 %query_id,
185 %session_id,
186 error_code,
187 "[Response] path={} message={}",
188 path,
189 err.message,
190 );
191 }
192 } else {
193 warn!(
194 %query_id,
195 %session_id,
196 error_code = code::UNKNOWN,
197 "[Response] path={} message={}",
198 path,
199 err.message,
200 );
201 }
202 } else {
203 let error_code = if let Some(error_code) = error_code {
204 error_code.to_string()
205 } else {
206 code::UNKNOWN.to_string()
207 };
208 info!(
209 %query_id,
210 %session_id,
211 error_code,
212 "[Response] message={}", err.message
213 )
214 }
215 }
216 } else if self.config.log_response {
217 match operation_name {
218 Some("IntrospectionQuery") => {
219 debug!(
220 %query_id,
221 %session_id,
222 "[Schema] {}", resp.data
223 );
224 }
225 _ if self.is_subscription.load(Ordering::Relaxed) => {
226 debug!(
229 %query_id,
230 %session_id,
231 "[Subscription] {}", resp.data
232 );
233 }
234 _ => info!(
235 %query_id,
236 %session_id,
237 "[Response] {}", resp.data
238 ),
239 }
240 }
241 resp
242 }
243}