1use std::borrow::Cow;
2
3use axum::{
4 Extension,
5 body::{Body as AxumBody, Bytes},
6 extract::{MatchedPath, Request},
7 http::HeaderValue,
8 middleware::Next,
9 response::Response,
10};
11use hyper::{
12 HeaderMap, Uri, Version,
13 body::Body,
14 header::{CONTENT_ENCODING, CONTENT_TYPE, USER_AGENT},
15};
16use nailip::{IdentifiedPeer, header_value_to_str};
17use opentelemetry::Context;
18use opentelemetry_http::HeaderExtractor;
19use opentelemetry_semantic_conventions::{
20 attribute::OTEL_STATUS_CODE,
21 trace::{ERROR_TYPE, HTTP_RESPONSE_STATUS_CODE},
22};
23use tracing::{Span, field::Empty, info_span};
24use uuid::Uuid;
25
26pub fn extract_context(headers: &HeaderMap) -> Context {
27 opentelemetry::global::get_text_map_propagator(|propagator| {
28 propagator.extract(&HeaderExtractor(headers))
29 })
30}
31
32#[inline]
33pub fn url_scheme(uri: &Uri) -> &str {
34 uri.scheme_str().unwrap_or_default()
35}
36
37#[inline]
38#[must_use]
39pub fn http_flavor(version: Version) -> Cow<'static, str> {
40 match version {
41 Version::HTTP_09 => "0.9".into(),
42 Version::HTTP_10 => "1.0".into(),
43 Version::HTTP_11 => "1.1".into(),
44 Version::HTTP_2 => "2.0".into(),
45 Version::HTTP_3 => "3.0".into(),
46 other => format!("{other:?}").into(),
47 }
48}
49
50pub async fn trace_connection_layer(
51 identified: Extension<IdentifiedPeer>,
52 req: Request,
53 next: Next,
54) -> Response<InspectBody<AxumBody>> {
55 use tracing_opentelemetry::OpenTelemetrySpanExt;
56
57 let headers = req.headers();
58
59 let request_id = headers.get("x-request-id").cloned().unwrap_or_else(
60 // SAFETY: The UUID is converted to a valid UTF-8 string before being turned into
61 // Bytes. As such, the Bytes instance corresponds to a valid internal repr for
62 // HeaderValue, meaning we can skip validation directly.
63 || unsafe {
64 HeaderValue::from_maybe_shared_unchecked(Bytes::from(Uuid::now_v7().to_string()))
65 },
66 );
67
68 let path = req
69 .extensions()
70 .get::<MatchedPath>()
71 .map_or("not-matched", MatchedPath::as_str);
72
73 let http_method = req.method().as_str();
74
75 let root_name = format!("{http_method} {path}");
76
77 let mut peer = identified.peer().split(":");
78
79 let span = info_span!(
80 "HTTP request",
81 http.request.method = %http_method,
82 http.route = path, // to set by router of "webframework" after
83 network.protocol.version = %http_flavor(req.version()),
84 client.address = Empty,
85 client.port = Empty,
86 user_agent.original = headers
87 .get(USER_AGENT)
88 .and_then(header_value_to_str)
89 .unwrap_or("None"),
90 http.response.status_code = Empty, // to set on response
91 http.response.header.content_encoding = Empty,
92 http.response.header.content_type = Empty,
93 url.path = req.uri().path(),
94 url.query = req.uri().query(),
95 http.scheme = url_scheme(req.uri()),
96 otel.name = root_name, // to set by router of "webframework" after
97 otel.kind = "server",
98 otel.status_code = Empty, // to set on response
99 trace_id = Empty, // to set on response
100 http.request.header.request_id = Empty, // to set
101 error.type = Empty,
102 );
103
104 if let Some(address) = peer.next() {
105 span.record("client.address", address);
106 }
107
108 if let Some(port) = peer.next() {
109 span.record("client.port", port);
110 }
111
112 if let Some(request_id) = header_value_to_str(&request_id) {
113 span.record("http.request.header.request_id", request_id);
114 }
115
116 let _ = span.set_parent(extract_context(headers));
117
118 let inner = span.in_scope(|| next.run(req));
119
120 let response = InspectHttpResponse {
121 inner,
122 span: InspectState::Ready { span, request_id },
123 };
124
125 response.await
126}
127
128enum InspectState {
129 Ready { span: Span, request_id: HeaderValue },
130 Finished,
131}
132
133impl InspectState {
134 #[inline]
135 fn span_ref(&self) -> &Span {
136 match self {
137 Self::Ready { span, .. } => span,
138 Self::Finished => unreachable!("Invalid state, future was polled after completion"),
139 }
140 }
141
142 #[inline]
143 fn take(&mut self) -> (Span, HeaderValue) {
144 let span = core::mem::replace(self, Self::Finished);
145
146 match span {
147 Self::Ready { span, request_id } => (span, request_id),
148 Self::Finished => unreachable!("Invalid state, future was polled after completion"),
149 }
150 }
151}
152
153pin_project_lite::pin_project! {
154 struct InspectHttpResponse<F> {
155 #[pin]
156 inner: F,
157 span: InspectState,
158 }
159}
160
161pin_project_lite::pin_project! {
162 #[doc(hidden)]
163 pub struct InspectBody<B> {
164 #[pin]
165 body: B,
166 span: Span,
167 }
168}
169
170impl<F> core::future::Future for InspectHttpResponse<F>
171where
172 F: core::future::Future<Output = Response>,
173{
174 type Output = Response<InspectBody<AxumBody>>;
175
176 #[inline]
177 fn poll(
178 self: core::pin::Pin<&mut Self>,
179 cx: &mut core::task::Context<'_>,
180 ) -> core::task::Poll<Self::Output> {
181 let this = self.project();
182
183 let span = this.span.span_ref();
184
185 let poll = span.in_scope(|| this.inner.poll(cx));
186
187 if let core::task::Poll::Ready(mut response) = poll {
188 let status = response.status();
189 let headers = response.headers();
190
191 span.record(HTTP_RESPONSE_STATUS_CODE, status.as_u16());
192
193 if let Some(encoding) = headers.get(CONTENT_ENCODING).and_then(header_value_to_str) {
194 span.record("http.response.header.content_encoding", encoding);
195 }
196
197 if let Some(content_type) = headers.get(CONTENT_TYPE).and_then(header_value_to_str) {
198 span.record("http.response.header.content_type", content_type);
199 }
200
201 if status.is_client_error() || status.is_server_error() {
202 span.record(ERROR_TYPE, status.as_u16());
203 }
204
205 if status.is_server_error() {
206 span.record(OTEL_STATUS_CODE, "ERROR");
207 } else {
208 span.record(OTEL_STATUS_CODE, "OK");
209 }
210
211 let (span, request_id) = this.span.take();
212
213 response.headers_mut().insert("x-request-id", request_id);
214
215 core::task::Poll::Ready(response.map(|body| InspectBody { body, span }))
216 } else {
217 core::task::Poll::Pending
218 }
219 }
220}
221
222impl<B> Body for InspectBody<B>
223where
224 B: Body,
225{
226 type Data = B::Data;
227 type Error = B::Error;
228
229 #[inline(always)]
230 fn poll_frame(
231 self: core::pin::Pin<&mut Self>,
232 cx: &mut core::task::Context<'_>,
233 ) -> core::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
234 let this = self.project();
235
236 this.span.in_scope(|| this.body.poll_frame(cx))
237 }
238}