package main import ( "context" "encoding/base64" "encoding/json" "net/http" "net/http/httptest" "strings" "testing" "time" ) func postAndReceive(t *testing.T, ts *httptest.Server, broker *Broker, contentType string, body string) *Event { t.Helper() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() ch, unsub := broker.Subscribe("test", "") defer unsub() req, _ := http.NewRequestWithContext(ctx, "POST", ts.URL+"/test", strings.NewReader(body)) req.Header.Set("Content-Type", contentType) resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("POST failed: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("expected 202, got %d", resp.StatusCode) } select { case event := <-ch: return event case <-ctx.Done(): t.Fatal("timed out waiting for event") return nil } } func TestServer_postFormDataStoredAsText(t *testing.T) { ts, broker, cancel := newTestServer(nil) defer cancel() defer ts.Close() event := postAndReceive(t, ts, broker, "application/x-www-form-urlencoded", "foo=bar&baz=qux") s, ok := event.Payload.(string) if !ok { t.Fatalf("expected string payload, got %T", event.Payload) } if s != "foo=bar&baz=qux" { t.Errorf("expected raw form data, got %s", s) } } func TestServer_postPlainTextStoredAsText(t *testing.T) { ts, broker, cancel := newTestServer(nil) defer cancel() defer ts.Close() event := postAndReceive(t, ts, broker, "text/plain", "hello world") s, ok := event.Payload.(string) if !ok { t.Fatalf("expected string payload, got %T", event.Payload) } if s != "hello world" { t.Errorf("expected raw text, got %s", s) } } func TestServer_postBinaryBase64Encoded(t *testing.T) { ts, broker, cancel := newTestServer(nil) defer cancel() defer ts.Close() event := postAndReceive(t, ts, broker, "application/octet-stream", "\x00\x01\x02\x03") s, ok := event.Payload.(string) if !ok { t.Fatalf("expected string payload, got %T", event.Payload) } expected := base64.StdEncoding.EncodeToString([]byte("\x00\x01\x02\x03")) if s != expected { t.Errorf("expected base64 %s, got %s", expected, s) } } func TestServer_postIncludesMethod(t *testing.T) { ts, broker, cancel := newTestServer(nil) defer cancel() defer ts.Close() event := postAndReceive(t, ts, broker, "application/json", `{"ok":true}`) if event.Method != "POST" { t.Errorf("expected method POST, got %s", event.Method) } } func TestServer_postMalformedContentTypeFallsBackToBase64(t *testing.T) { ts, broker, cancel := newTestServer(nil) defer cancel() defer ts.Close() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() ch, unsub := broker.Subscribe("test", "") defer unsub() req, _ := http.NewRequestWithContext(ctx, "POST", ts.URL+"/test", strings.NewReader("some data")) req.Header.Set("Content-Type", ";;;malformed") resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("POST failed: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { t.Fatalf("expected 202, got %d", resp.StatusCode) } select { case event := <-ch: s, ok := event.Payload.(string) if !ok { t.Fatalf("expected string payload, got %T", event.Payload) } expected := base64.StdEncoding.EncodeToString([]byte("some data")) if s != expected { t.Errorf("expected base64 %s, got %s", expected, s) } case <-ctx.Done(): t.Fatal("timed out waiting for event") } } func TestIsTextContent(t *testing.T) { tests := []struct { name string mediaType string params map[string]string want bool }{ {"text/plain", "text/plain", nil, true}, {"text/html", "text/html", nil, true}, {"text/xml", "text/xml", nil, true}, {"form urlencoded", "application/x-www-form-urlencoded", nil, true}, {"application/xml", "application/xml", nil, true}, {"application/xhtml+xml", "application/xhtml+xml", nil, true}, {"charset param on non-text", "application/octet-stream", map[string]string{"charset": "utf-8"}, true}, {"application/json", "application/json", nil, false}, {"application/octet-stream", "application/octet-stream", nil, false}, {"image/png", "image/png", nil, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := isTextContent(tt.mediaType, tt.params) if got != tt.want { t.Errorf("isTextContent(%q, %v) = %v, want %v", tt.mediaType, tt.params, got, tt.want) } }) } } func TestServer_postJSONViaSSERoundTrip(t *testing.T) { ts, _, cancel := newTestServer(nil) defer cancel() defer ts.Close() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() events := sseSubscribe(ctx, ts.URL+"/test", nil) time.Sleep(50 * time.Millisecond) http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{"key":"value"}`)) select { case event := <-events: if event.Method != "POST" { t.Errorf("expected method POST in SSE event, got %s", event.Method) } data, _ := json.Marshal(event.Payload) var m map[string]string json.Unmarshal(data, &m) if m["key"] != "value" { t.Errorf("expected key=value, got %v", m) } case <-ctx.Done(): t.Fatal("timed out waiting for SSE event") } }