Webhook-to-SSE gateway with hierarchical topic routing and signature verification
at main 204 lines 5.0 kB view raw
1package main 2 3import ( 4 "encoding/base64" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "mime" 10 "net/http" 11 "strings" 12 "sync/atomic" 13 "time" 14 15 "github.com/google/uuid" 16) 17 18type Server struct { 19 broker *Broker 20 config *atomic.Pointer[Configuration] 21} 22 23func NewServer(broker *Broker, config *atomic.Pointer[Configuration]) http.Handler { 24 s := &Server{broker: broker, config: config} 25 return http.HandlerFunc(s.ServeHTTP) 26} 27 28func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 29 setCORSHeaders(w) 30 31 if r.Method == "GET" && r.URL.Path == "/_health" { 32 w.WriteHeader(http.StatusNoContent) 33 return 34 } 35 36 switch r.Method { 37 case "OPTIONS": 38 w.WriteHeader(http.StatusNoContent) 39 case "POST": 40 s.handlePost(w, r) 41 case "GET": 42 if !strings.Contains(r.Header.Get("Accept"), "text/event-stream") { 43 http.NotFound(w, r) 44 return 45 } 46 s.handleSSE(w, r) 47 default: 48 http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 49 } 50} 51 52func normalizePath(raw string) string { 53 return strings.TrimRight(strings.TrimPrefix(raw, "/"), "/") 54} 55 56func setCORSHeaders(w http.ResponseWriter) { 57 w.Header().Set("Access-Control-Allow-Origin", "*") 58 w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") 59 w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, Last-Event-ID") 60} 61 62func (s *Server) handlePost(w http.ResponseWriter, r *http.Request) { 63 path := normalizePath(r.URL.Path) 64 65 body, _ := io.ReadAll(r.Body) 66 67 cfg := s.config.Load() 68 if pc := cfg.LookupVerification(path); pc != nil { 69 verifier, err := NewVerifier(pc.Verify) 70 if err != nil { 71 http.Error(w, "server configuration error", http.StatusInternalServerError) 72 return 73 } 74 if err := verifier.Verify(body, r.Header, pc.Secret, pc.SignatureHeader); err != nil { 75 log.Printf("verification failed for %s: %v, headers: %v", path, err, r.Header) 76 http.Error(w, "forbidden", http.StatusForbidden) 77 return 78 } 79 } 80 81 var payload any 82 mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) 83 if err == nil && mediaType == "application/json" { 84 if err := json.Unmarshal(body, &payload); err != nil { 85 http.Error(w, "invalid JSON", http.StatusBadRequest) 86 return 87 } 88 } else if err == nil && isTextContent(mediaType, params) { 89 payload = string(body) 90 } else { 91 payload = base64.StdEncoding.EncodeToString(body) 92 } 93 94 headers := extractHeaders(r.Header) 95 96 event := &Event{ 97 ID: uuid.New().String(), 98 Timestamp: time.Now().UTC(), 99 Method: r.Method, 100 Path: path, 101 Headers: headers, 102 Payload: payload, 103 } 104 105 if err := s.broker.Publish(event); err != nil { 106 log.Printf("publish failed for %s: %v", path, err) 107 http.Error(w, "publish failed", http.StatusInternalServerError) 108 return 109 } 110 log.Printf("published %s %s (%s) id=%s", r.Method, path, r.Header.Get("Content-Type"), event.ID) 111 w.WriteHeader(http.StatusAccepted) 112} 113 114func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { 115 path := normalizePath(r.URL.Path) 116 117 cfg := s.config.Load() 118 if secret := cfg.LookupSubscribeSecret(path); secret != "" { 119 auth := r.Header.Get("Authorization") 120 if !strings.HasPrefix(auth, "Bearer ") || strings.TrimPrefix(auth, "Bearer ") != secret { 121 http.Error(w, "unauthorized", http.StatusUnauthorized) 122 return 123 } 124 } 125 126 flusher, ok := w.(http.Flusher) 127 if !ok { 128 http.Error(w, "streaming unsupported", http.StatusInternalServerError) 129 return 130 } 131 132 filters := ParseFilters(r.URL.Query()) 133 lastEventID := r.Header.Get("Last-Event-ID") 134 135 ch, unsub := s.broker.Subscribe(path, lastEventID) 136 defer unsub() 137 138 w.Header().Set("Content-Type", "text/event-stream") 139 w.Header().Set("Cache-Control", "no-cache") 140 w.Header().Set("Connection", "keep-alive") 141 w.WriteHeader(http.StatusOK) 142 flusher.Flush() 143 144 ctx := r.Context() 145 for { 146 select { 147 case <-ctx.Done(): 148 return 149 case event, ok := <-ch: 150 if !ok { 151 return 152 } 153 if !MatchAll(filters, event) { 154 continue 155 } 156 data, err := json.Marshal(event) 157 if err != nil { 158 log.Printf("marshaling event %s: %v", event.ID, err) 159 continue 160 } 161 fmt.Fprintf(w, "id: %s\ndata: %s\n\n", event.ID, data) 162 flusher.Flush() 163 } 164 } 165} 166 167var hopByHopHeaders = map[string]bool{ 168 "Connection": true, 169 "Keep-Alive": true, 170 "Proxy-Authenticate": true, 171 "Proxy-Authorization": true, 172 "Te": true, 173 "Trailer": true, 174 "Transfer-Encoding": true, 175 "Upgrade": true, 176 "Host": true, 177 "Content-Length": true, 178} 179 180func extractHeaders(h http.Header) map[string]string { 181 headers := make(map[string]string) 182 for name, values := range h { 183 if hopByHopHeaders[name] { 184 continue 185 } 186 headers[name] = values[0] 187 } 188 return headers 189} 190 191func isTextContent(mediaType string, params map[string]string) bool { 192 if strings.HasPrefix(mediaType, "text/") { 193 return true 194 } 195 if mediaType == "application/x-www-form-urlencoded" || 196 mediaType == "application/xml" || 197 mediaType == "application/xhtml+xml" { 198 return true 199 } 200 if _, ok := params["charset"]; ok { 201 return true 202 } 203 return false 204}