Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "encoding/base64"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log"
9 "mime"
10 "net/http"
11 "strings"
12 "sync/atomic"
13 "time"
14
15 "github.com/google/uuid"
16)
17
18type Server struct {
19 broker *Broker
20 config *atomic.Pointer[Configuration]
21}
22
23func NewServer(broker *Broker, config *atomic.Pointer[Configuration]) http.Handler {
24 s := &Server{broker: broker, config: config}
25 return http.HandlerFunc(s.ServeHTTP)
26}
27
28func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
29 setCORSHeaders(w)
30
31 if r.Method == "GET" && r.URL.Path == "/_health" {
32 w.WriteHeader(http.StatusNoContent)
33 return
34 }
35
36 switch r.Method {
37 case "OPTIONS":
38 w.WriteHeader(http.StatusNoContent)
39 case "POST":
40 s.handlePost(w, r)
41 case "GET":
42 if !strings.Contains(r.Header.Get("Accept"), "text/event-stream") {
43 http.NotFound(w, r)
44 return
45 }
46 s.handleSSE(w, r)
47 default:
48 http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
49 }
50}
51
52func normalizePath(raw string) string {
53 return strings.TrimRight(strings.TrimPrefix(raw, "/"), "/")
54}
55
56func setCORSHeaders(w http.ResponseWriter) {
57 w.Header().Set("Access-Control-Allow-Origin", "*")
58 w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
59 w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, Last-Event-ID")
60}
61
62func (s *Server) handlePost(w http.ResponseWriter, r *http.Request) {
63 path := normalizePath(r.URL.Path)
64
65 body, _ := io.ReadAll(r.Body)
66
67 cfg := s.config.Load()
68 if pc := cfg.LookupVerification(path); pc != nil {
69 verifier, err := NewVerifier(pc.Verify)
70 if err != nil {
71 http.Error(w, "server configuration error", http.StatusInternalServerError)
72 return
73 }
74 if err := verifier.Verify(body, r.Header, pc.Secret, pc.SignatureHeader); err != nil {
75 log.Printf("verification failed for %s: %v, headers: %v", path, err, r.Header)
76 http.Error(w, "forbidden", http.StatusForbidden)
77 return
78 }
79 }
80
81 var payload any
82 mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
83 if err == nil && mediaType == "application/json" {
84 if err := json.Unmarshal(body, &payload); err != nil {
85 http.Error(w, "invalid JSON", http.StatusBadRequest)
86 return
87 }
88 } else if err == nil && isTextContent(mediaType, params) {
89 payload = string(body)
90 } else {
91 payload = base64.StdEncoding.EncodeToString(body)
92 }
93
94 headers := extractHeaders(r.Header)
95
96 event := &Event{
97 ID: uuid.New().String(),
98 Timestamp: time.Now().UTC(),
99 Method: r.Method,
100 Path: path,
101 Headers: headers,
102 Payload: payload,
103 }
104
105 if err := s.broker.Publish(event); err != nil {
106 log.Printf("publish failed for %s: %v", path, err)
107 http.Error(w, "publish failed", http.StatusInternalServerError)
108 return
109 }
110 log.Printf("published %s %s (%s) id=%s", r.Method, path, r.Header.Get("Content-Type"), event.ID)
111 w.WriteHeader(http.StatusAccepted)
112}
113
114func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) {
115 path := normalizePath(r.URL.Path)
116
117 cfg := s.config.Load()
118 if secret := cfg.LookupSubscribeSecret(path); secret != "" {
119 auth := r.Header.Get("Authorization")
120 if !strings.HasPrefix(auth, "Bearer ") || strings.TrimPrefix(auth, "Bearer ") != secret {
121 http.Error(w, "unauthorized", http.StatusUnauthorized)
122 return
123 }
124 }
125
126 flusher, ok := w.(http.Flusher)
127 if !ok {
128 http.Error(w, "streaming unsupported", http.StatusInternalServerError)
129 return
130 }
131
132 filters := ParseFilters(r.URL.Query())
133 lastEventID := r.Header.Get("Last-Event-ID")
134
135 ch, unsub := s.broker.Subscribe(path, lastEventID)
136 defer unsub()
137
138 w.Header().Set("Content-Type", "text/event-stream")
139 w.Header().Set("Cache-Control", "no-cache")
140 w.Header().Set("Connection", "keep-alive")
141 w.WriteHeader(http.StatusOK)
142 flusher.Flush()
143
144 ctx := r.Context()
145 for {
146 select {
147 case <-ctx.Done():
148 return
149 case event, ok := <-ch:
150 if !ok {
151 return
152 }
153 if !MatchAll(filters, event) {
154 continue
155 }
156 data, err := json.Marshal(event)
157 if err != nil {
158 log.Printf("marshaling event %s: %v", event.ID, err)
159 continue
160 }
161 fmt.Fprintf(w, "id: %s\ndata: %s\n\n", event.ID, data)
162 flusher.Flush()
163 }
164 }
165}
166
167var hopByHopHeaders = map[string]bool{
168 "Connection": true,
169 "Keep-Alive": true,
170 "Proxy-Authenticate": true,
171 "Proxy-Authorization": true,
172 "Te": true,
173 "Trailer": true,
174 "Transfer-Encoding": true,
175 "Upgrade": true,
176 "Host": true,
177 "Content-Length": true,
178}
179
180func extractHeaders(h http.Header) map[string]string {
181 headers := make(map[string]string)
182 for name, values := range h {
183 if hopByHopHeaders[name] {
184 continue
185 }
186 headers[name] = values[0]
187 }
188 return headers
189}
190
191func isTextContent(mediaType string, params map[string]string) bool {
192 if strings.HasPrefix(mediaType, "text/") {
193 return true
194 }
195 if mediaType == "application/x-www-form-urlencoded" ||
196 mediaType == "application/xml" ||
197 mediaType == "application/xhtml+xml" {
198 return true
199 }
200 if _, ok := params["charset"]; ok {
201 return true
202 }
203 return false
204}