A pit full of rusty nails
at main 238 lines 6.9 kB view raw
1use std::borrow::Cow; 2 3use axum::{ 4 Extension, 5 body::{Body as AxumBody, Bytes}, 6 extract::{MatchedPath, Request}, 7 http::HeaderValue, 8 middleware::Next, 9 response::Response, 10}; 11use hyper::{ 12 HeaderMap, Uri, Version, 13 body::Body, 14 header::{CONTENT_ENCODING, CONTENT_TYPE, USER_AGENT}, 15}; 16use nailip::{IdentifiedPeer, header_value_to_str}; 17use opentelemetry::Context; 18use opentelemetry_http::HeaderExtractor; 19use opentelemetry_semantic_conventions::{ 20 attribute::OTEL_STATUS_CODE, 21 trace::{ERROR_TYPE, HTTP_RESPONSE_STATUS_CODE}, 22}; 23use tracing::{Span, field::Empty, info_span}; 24use uuid::Uuid; 25 26pub fn extract_context(headers: &HeaderMap) -> Context { 27 opentelemetry::global::get_text_map_propagator(|propagator| { 28 propagator.extract(&HeaderExtractor(headers)) 29 }) 30} 31 32#[inline] 33pub fn url_scheme(uri: &Uri) -> &str { 34 uri.scheme_str().unwrap_or_default() 35} 36 37#[inline] 38#[must_use] 39pub fn http_flavor(version: Version) -> Cow<'static, str> { 40 match version { 41 Version::HTTP_09 => "0.9".into(), 42 Version::HTTP_10 => "1.0".into(), 43 Version::HTTP_11 => "1.1".into(), 44 Version::HTTP_2 => "2.0".into(), 45 Version::HTTP_3 => "3.0".into(), 46 other => format!("{other:?}").into(), 47 } 48} 49 50pub async fn trace_connection_layer( 51 identified: Extension<IdentifiedPeer>, 52 req: Request, 53 next: Next, 54) -> Response<InspectBody<AxumBody>> { 55 use tracing_opentelemetry::OpenTelemetrySpanExt; 56 57 let headers = req.headers(); 58 59 let request_id = headers.get("x-request-id").cloned().unwrap_or_else( 60 // SAFETY: The UUID is converted to a valid UTF-8 string before being turned into 61 // Bytes. As such, the Bytes instance corresponds to a valid internal repr for 62 // HeaderValue, meaning we can skip validation directly. 63 || unsafe { 64 HeaderValue::from_maybe_shared_unchecked(Bytes::from(Uuid::now_v7().to_string())) 65 }, 66 ); 67 68 let path = req 69 .extensions() 70 .get::<MatchedPath>() 71 .map_or("not-matched", MatchedPath::as_str); 72 73 let http_method = req.method().as_str(); 74 75 let root_name = format!("{http_method} {path}"); 76 77 let mut peer = identified.peer().split(":"); 78 79 let span = info_span!( 80 "HTTP request", 81 http.request.method = %http_method, 82 http.route = path, // to set by router of "webframework" after 83 network.protocol.version = %http_flavor(req.version()), 84 client.address = Empty, 85 client.port = Empty, 86 user_agent.original = headers 87 .get(USER_AGENT) 88 .and_then(header_value_to_str) 89 .unwrap_or("None"), 90 http.response.status_code = Empty, // to set on response 91 http.response.header.content_encoding = Empty, 92 http.response.header.content_type = Empty, 93 url.path = req.uri().path(), 94 url.query = req.uri().query(), 95 http.scheme = url_scheme(req.uri()), 96 otel.name = root_name, // to set by router of "webframework" after 97 otel.kind = "server", 98 otel.status_code = Empty, // to set on response 99 trace_id = Empty, // to set on response 100 http.request.header.request_id = Empty, // to set 101 error.type = Empty, 102 ); 103 104 if let Some(address) = peer.next() { 105 span.record("client.address", address); 106 } 107 108 if let Some(port) = peer.next() { 109 span.record("client.port", port); 110 } 111 112 if let Some(request_id) = header_value_to_str(&request_id) { 113 span.record("http.request.header.request_id", request_id); 114 } 115 116 let _ = span.set_parent(extract_context(headers)); 117 118 let inner = span.in_scope(|| next.run(req)); 119 120 let response = InspectHttpResponse { 121 inner, 122 span: InspectState::Ready { span, request_id }, 123 }; 124 125 response.await 126} 127 128enum InspectState { 129 Ready { span: Span, request_id: HeaderValue }, 130 Finished, 131} 132 133impl InspectState { 134 #[inline] 135 fn span_ref(&self) -> &Span { 136 match self { 137 Self::Ready { span, .. } => span, 138 Self::Finished => unreachable!("Invalid state, future was polled after completion"), 139 } 140 } 141 142 #[inline] 143 fn take(&mut self) -> (Span, HeaderValue) { 144 let span = core::mem::replace(self, Self::Finished); 145 146 match span { 147 Self::Ready { span, request_id } => (span, request_id), 148 Self::Finished => unreachable!("Invalid state, future was polled after completion"), 149 } 150 } 151} 152 153pin_project_lite::pin_project! { 154 struct InspectHttpResponse<F> { 155 #[pin] 156 inner: F, 157 span: InspectState, 158 } 159} 160 161pin_project_lite::pin_project! { 162 #[doc(hidden)] 163 pub struct InspectBody<B> { 164 #[pin] 165 body: B, 166 span: Span, 167 } 168} 169 170impl<F> core::future::Future for InspectHttpResponse<F> 171where 172 F: core::future::Future<Output = Response>, 173{ 174 type Output = Response<InspectBody<AxumBody>>; 175 176 #[inline] 177 fn poll( 178 self: core::pin::Pin<&mut Self>, 179 cx: &mut core::task::Context<'_>, 180 ) -> core::task::Poll<Self::Output> { 181 let this = self.project(); 182 183 let span = this.span.span_ref(); 184 185 let poll = span.in_scope(|| this.inner.poll(cx)); 186 187 if let core::task::Poll::Ready(mut response) = poll { 188 let status = response.status(); 189 let headers = response.headers(); 190 191 span.record(HTTP_RESPONSE_STATUS_CODE, status.as_u16()); 192 193 if let Some(encoding) = headers.get(CONTENT_ENCODING).and_then(header_value_to_str) { 194 span.record("http.response.header.content_encoding", encoding); 195 } 196 197 if let Some(content_type) = headers.get(CONTENT_TYPE).and_then(header_value_to_str) { 198 span.record("http.response.header.content_type", content_type); 199 } 200 201 if status.is_client_error() || status.is_server_error() { 202 span.record(ERROR_TYPE, status.as_u16()); 203 } 204 205 if status.is_server_error() { 206 span.record(OTEL_STATUS_CODE, "ERROR"); 207 } else { 208 span.record(OTEL_STATUS_CODE, "OK"); 209 } 210 211 let (span, request_id) = this.span.take(); 212 213 response.headers_mut().insert("x-request-id", request_id); 214 215 core::task::Poll::Ready(response.map(|body| InspectBody { body, span })) 216 } else { 217 core::task::Poll::Pending 218 } 219 } 220} 221 222impl<B> Body for InspectBody<B> 223where 224 B: Body, 225{ 226 type Data = B::Data; 227 type Error = B::Error; 228 229 #[inline(always)] 230 fn poll_frame( 231 self: core::pin::Pin<&mut Self>, 232 cx: &mut core::task::Context<'_>, 233 ) -> core::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { 234 let this = self.project(); 235 236 this.span.in_scope(|| this.body.poll_frame(cx)) 237 } 238}