package main import ( "encoding/base64" "encoding/json" "fmt" "io" "log" "mime" "net/http" "strings" "sync/atomic" "time" "github.com/google/uuid" ) type Server struct { broker *Broker config *atomic.Pointer[Configuration] } func NewServer(broker *Broker, config *atomic.Pointer[Configuration]) http.Handler { s := &Server{broker: broker, config: config} return http.HandlerFunc(s.ServeHTTP) } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { setCORSHeaders(w) if r.Method == "GET" && r.URL.Path == "/_health" { w.WriteHeader(http.StatusNoContent) return } switch r.Method { case "OPTIONS": w.WriteHeader(http.StatusNoContent) case "POST": s.handlePost(w, r) case "GET": if !strings.Contains(r.Header.Get("Accept"), "text/event-stream") { http.NotFound(w, r) return } s.handleSSE(w, r) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } } func normalizePath(raw string) string { return strings.TrimRight(strings.TrimPrefix(raw, "/"), "/") } func setCORSHeaders(w http.ResponseWriter) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, Last-Event-ID") } func (s *Server) handlePost(w http.ResponseWriter, r *http.Request) { path := normalizePath(r.URL.Path) body, _ := io.ReadAll(r.Body) cfg := s.config.Load() if pc := cfg.LookupVerification(path); pc != nil { verifier, err := NewVerifier(pc.Verify) if err != nil { http.Error(w, "server configuration error", http.StatusInternalServerError) return } if err := verifier.Verify(body, r.Header, pc.Secret, pc.SignatureHeader); err != nil { log.Printf("verification failed for %s: %v, headers: %v", path, err, r.Header) http.Error(w, "forbidden", http.StatusForbidden) return } } var payload any mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err == nil && mediaType == "application/json" { if err := json.Unmarshal(body, &payload); err != nil { http.Error(w, "invalid JSON", http.StatusBadRequest) return } } else if err == nil && isTextContent(mediaType, params) { payload = string(body) } else { payload = base64.StdEncoding.EncodeToString(body) } headers := extractHeaders(r.Header) event := &Event{ ID: uuid.New().String(), Timestamp: time.Now().UTC(), Method: r.Method, Path: path, Headers: headers, Payload: payload, } if err := s.broker.Publish(event); err != nil { log.Printf("publish failed for %s: %v", path, err) http.Error(w, "publish failed", http.StatusInternalServerError) return } log.Printf("published %s %s (%s) id=%s", r.Method, path, r.Header.Get("Content-Type"), event.ID) w.WriteHeader(http.StatusAccepted) } func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { path := normalizePath(r.URL.Path) cfg := s.config.Load() if secret := cfg.LookupSubscribeSecret(path); secret != "" { auth := r.Header.Get("Authorization") if !strings.HasPrefix(auth, "Bearer ") || strings.TrimPrefix(auth, "Bearer ") != secret { http.Error(w, "unauthorized", http.StatusUnauthorized) return } } flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } filters := ParseFilters(r.URL.Query()) lastEventID := r.Header.Get("Last-Event-ID") ch, unsub := s.broker.Subscribe(path, lastEventID) defer unsub() w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.WriteHeader(http.StatusOK) flusher.Flush() ctx := r.Context() for { select { case <-ctx.Done(): return case event, ok := <-ch: if !ok { return } if !MatchAll(filters, event) { continue } data, err := json.Marshal(event) if err != nil { log.Printf("marshaling event %s: %v", event.ID, err) continue } fmt.Fprintf(w, "id: %s\ndata: %s\n\n", event.ID, data) flusher.Flush() } } } var hopByHopHeaders = map[string]bool{ "Connection": true, "Keep-Alive": true, "Proxy-Authenticate": true, "Proxy-Authorization": true, "Te": true, "Trailer": true, "Transfer-Encoding": true, "Upgrade": true, "Host": true, "Content-Length": true, } func extractHeaders(h http.Header) map[string]string { headers := make(map[string]string) for name, values := range h { if hopByHopHeaders[name] { continue } headers[name] = values[0] } return headers } func isTextContent(mediaType string, params map[string]string) bool { if strings.HasPrefix(mediaType, "text/") { return true } if mediaType == "application/x-www-form-urlencoded" || mediaType == "application/xml" || mediaType == "application/xhtml+xml" { return true } if _, ok := params["charset"]; ok { return true } return false }