iota_graphql_rpc/extensions/
logger.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    fmt::Write,
7    net::SocketAddr,
8    sync::{
9        Arc,
10        atomic::{AtomicBool, Ordering},
11    },
12};
13
14use async_graphql::{
15    PathSegment, Response, ServerError, ServerResult, ValidationResult, Variables,
16    extensions::{
17        Extension, ExtensionContext, ExtensionFactory, NextExecute, NextParseQuery, NextResolve,
18        NextSubscribe, NextValidation, ResolveInfo,
19    },
20    parser::types::{ExecutableDocument, OperationType, Selection},
21};
22use async_graphql_value::ConstValue;
23use futures::stream::BoxStream;
24use tracing::{debug, error, info, warn};
25use uuid::Uuid;
26
27use crate::{error::code, metrics::Metrics};
28
29#[derive(Clone, Debug)]
30pub struct LoggerConfig {
31    pub log_request_query: bool,
32    pub log_response: bool,
33    pub log_complexity: bool,
34}
35
36impl Default for LoggerConfig {
37    fn default() -> Self {
38        Self {
39            log_request_query: false,
40            log_response: true,
41            log_complexity: true,
42        }
43    }
44}
45
46#[derive(Clone, Debug, Default)]
47pub struct Logger {
48    config: LoggerConfig,
49}
50
51impl ExtensionFactory for Logger {
52    fn create(&self) -> Arc<dyn Extension> {
53        Arc::new(LoggerExtension {
54            config: self.config.clone(),
55            is_subscription: AtomicBool::default(),
56        })
57    }
58}
59
60struct LoggerExtension {
61    config: LoggerConfig,
62    /// Marks if the current request is a subscription.
63    is_subscription: AtomicBool,
64}
65
66#[async_trait::async_trait]
67impl Extension for LoggerExtension {
68    fn subscribe<'s>(
69        &self,
70        ctx: &ExtensionContext<'_>,
71        stream: BoxStream<'s, Response>,
72        next: NextSubscribe<'_>,
73    ) -> BoxStream<'s, Response> {
74        // flag this operation as a subscription so later hooks (execute) can
75        // adjust logging.
76        self.is_subscription.store(true, Ordering::Relaxed);
77        next.run(ctx, stream)
78    }
79
80    // This hook is used to get the top level node name for recording in the metrics
81    // which top level nodes are being called.
82    async fn resolve(
83        &self,
84        ctx: &ExtensionContext<'_>,
85        info: ResolveInfo<'_>,
86        next: NextResolve<'_>,
87    ) -> ServerResult<Option<ConstValue>> {
88        if info.path_node.parent.is_none() {
89            ctx.data_unchecked::<Metrics>()
90                .request_metrics
91                .num_queries_top_level
92                .with_label_values(&[info.name])
93                .inc();
94        }
95        next.run(ctx, info).await
96    }
97
98    async fn parse_query(
99        &self,
100        ctx: &ExtensionContext<'_>,
101        query: &str,
102        variables: &Variables,
103        next: NextParseQuery<'_>,
104    ) -> ServerResult<ExecutableDocument> {
105        let document = next.run(ctx, query, variables).await?;
106        let is_schema = document
107            .operations
108            .iter()
109            .filter(|(_, operation)| operation.node.ty == OperationType::Query)
110            .any(|(_, operation)| operation.node.selection_set.node.items.iter().any(|selection| matches!(&selection.node, Selection::Field(field) if field.node.name.node == "__schema")));
111        let query_id: &Uuid = ctx.data_unchecked();
112        let session_id: &SocketAddr = ctx.data_unchecked();
113        if !is_schema && self.config.log_request_query {
114            info!(
115                %query_id,
116                %session_id,
117                "[Query] {}",
118                ctx.stringify_execute_doc(&document, variables)
119            );
120        }
121        Ok(document)
122    }
123
124    async fn validation(
125        &self,
126        ctx: &ExtensionContext<'_>,
127        next: NextValidation<'_>,
128    ) -> Result<ValidationResult, Vec<ServerError>> {
129        let res = next.run(ctx).await?;
130        let query_id: &Uuid = ctx.data_unchecked();
131        let session_id: &SocketAddr = ctx.data_unchecked();
132        if self.config.log_complexity {
133            info!(
134                %query_id,
135                %session_id,
136                complexity = res.complexity,
137                depth = res.depth,
138                "[Validation]",
139            );
140        }
141        Ok(res)
142    }
143
144    async fn execute(
145        &self,
146        ctx: &ExtensionContext<'_>,
147        operation_name: Option<&str>,
148        next: NextExecute<'_>,
149    ) -> Response {
150        let resp = next.run(ctx, operation_name).await;
151        let query_id: &Uuid = ctx.data_unchecked();
152        let session_id: &SocketAddr = ctx.data_unchecked();
153        if resp.is_err() {
154            for err in &resp.errors {
155                let error_code = &err.extensions.as_ref().and_then(|x| x.get("code"));
156                if !err.path.is_empty() {
157                    let mut path = String::new();
158                    for (idx, s) in err.path.iter().enumerate() {
159                        if idx > 0 {
160                            path.push('.');
161                        }
162                        match s {
163                            PathSegment::Index(idx) => {
164                                let _ = write!(&mut path, "{idx}");
165                            }
166                            PathSegment::Field(name) => {
167                                let _ = write!(&mut path, "{name}");
168                            }
169                        }
170                    }
171
172                    if let Some(async_graphql_value::ConstValue::String(error_code)) = error_code {
173                        if error_code.as_str() == code::INTERNAL_SERVER_ERROR {
174                            error!(
175                                %query_id,
176                                %session_id,
177                                error_code,
178                                "[Response] path={} message={}",
179                                path,
180                                err.message,
181                            );
182                        } else {
183                            info!(
184                                %query_id,
185                                %session_id,
186                                error_code,
187                                "[Response] path={} message={}",
188                                path,
189                                err.message,
190                            );
191                        }
192                    } else {
193                        warn!(
194                            %query_id,
195                            %session_id,
196                            error_code = code::UNKNOWN,
197                            "[Response] path={} message={}",
198                            path,
199                            err.message,
200                        );
201                    }
202                } else {
203                    let error_code = if let Some(error_code) = error_code {
204                        error_code.to_string()
205                    } else {
206                        code::UNKNOWN.to_string()
207                    };
208                    info!(
209                        %query_id,
210                        %session_id,
211                        error_code,
212                        "[Response] message={}", err.message
213                    )
214                }
215            }
216        } else if self.config.log_response {
217            match operation_name {
218                Some("IntrospectionQuery") => {
219                    debug!(
220                        %query_id,
221                        %session_id,
222                        "[Schema] {}", resp.data
223                    );
224                }
225                _ if self.is_subscription.load(Ordering::Relaxed) => {
226                    // a subscription can emit many payloads; to avoid flooding normal response
227                    // logs we log subscription payloads at debug level.
228                    debug!(
229                        %query_id,
230                        %session_id,
231                        "[Subscription] {}", resp.data
232                    );
233                }
234                _ => info!(
235                    %query_id,
236                    %session_id,
237                    "[Response] {}", resp.data
238                ),
239            }
240        }
241        resp
242    }
243}