Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "strings"
6 "sync"
7)
8
9type MemoryBackend struct {
10 mu sync.RWMutex
11 buffer *RingBuffer
12 listeners []chan *Event
13}
14
15func NewMemoryBackend(bufferSize int) *MemoryBackend {
16 return &MemoryBackend{
17 buffer: NewRingBuffer(bufferSize),
18 }
19}
20
21func (m *MemoryBackend) Publish(event *Event) error {
22 m.buffer.Add(event)
23
24 m.mu.RLock()
25 defer m.mu.RUnlock()
26
27 for _, ch := range m.listeners {
28 select {
29 case ch <- event:
30 default:
31 }
32 }
33 return nil
34}
35
36func (m *MemoryBackend) Subscribe(ctx context.Context) <-chan *Event {
37 ch := make(chan *Event, 256)
38
39 m.mu.Lock()
40 m.listeners = append(m.listeners, ch)
41 m.mu.Unlock()
42
43 go func() {
44 <-ctx.Done()
45 m.mu.Lock()
46 defer m.mu.Unlock()
47 for i, l := range m.listeners {
48 if l == ch {
49 m.listeners = append(m.listeners[:i], m.listeners[i+1:]...)
50 break
51 }
52 }
53 }()
54
55 return ch
56}
57
58func (m *MemoryBackend) Since(lastEventID string, subscribePath string) []*Event {
59 return m.buffer.Since(lastEventID, subscribePath)
60}
61
62func (m *MemoryBackend) Close() error {
63 return nil
64}
65
66type RingBuffer struct {
67 mu sync.RWMutex
68 buf []*Event
69 size int
70 write int
71 count int
72}
73
74func NewRingBuffer(size int) *RingBuffer {
75 return &RingBuffer{
76 buf: make([]*Event, size),
77 size: size,
78 }
79}
80
81func (rb *RingBuffer) Add(event *Event) {
82 rb.mu.Lock()
83 defer rb.mu.Unlock()
84 rb.buf[rb.write%rb.size] = event
85 rb.write++
86 if rb.count < rb.size {
87 rb.count++
88 }
89}
90
91func (rb *RingBuffer) Since(lastEventID string, subscribePath string) []*Event {
92 rb.mu.RLock()
93 defer rb.mu.RUnlock()
94
95 start := rb.write - rb.count
96 found := false
97 foundIdx := start
98
99 for i := start; i < rb.write; i++ {
100 e := rb.buf[i%rb.size]
101 if e.ID == lastEventID {
102 found = true
103 foundIdx = i + 1
104 break
105 }
106 }
107
108 if !found {
109 foundIdx = start
110 }
111
112 var result []*Event
113 for i := foundIdx; i < rb.write; i++ {
114 e := rb.buf[i%rb.size]
115 if pathMatches(subscribePath, e.Path) {
116 result = append(result, e)
117 }
118 }
119 return result
120}
121
122func pathMatches(subscribePath, eventPath string) bool {
123 if subscribePath == "" {
124 return true
125 }
126 return eventPath == subscribePath || strings.HasPrefix(eventPath, subscribePath+"/")
127}