+27
-14
internal/pubsub.go
+27
-14
internal/pubsub.go
···
6
6
"fmt"
7
7
"net/http"
8
8
"slices"
9
+
"sync"
9
10
"time"
10
11
)
11
12
···
16
17
17
18
type PubSub struct {
18
19
topics map[string][]*SubClients
20
+
mu sync.RWMutex
19
21
}
20
22
21
23
func NewPubSub() *PubSub {
···
23
25
}
24
26
25
27
func (p *PubSub) getOrCreateTopic(topicName, requestId string) *SubClients {
28
+
p.mu.Lock()
29
+
defer p.mu.Unlock()
30
+
26
31
clients, exists := p.topics[topicName]
27
32
if exists == false {
28
33
clients = []*SubClients{}
···
45
50
return currClient
46
51
}
47
52
53
+
func (p *PubSub) clientDisconnect(topicName, requestId string) {
54
+
p.mu.Lock()
55
+
defer p.mu.Unlock()
56
+
57
+
var index int
58
+
for i, client := range p.topics[topicName] {
59
+
if client.id == requestId {
60
+
index = i
61
+
break
62
+
}
63
+
}
64
+
65
+
p.topics[topicName] = slices.Delete(p.topics[topicName], index, index+1)
66
+
67
+
if len(p.topics[topicName]) == 0 {
68
+
delete(p.topics, topicName)
69
+
}
70
+
}
71
+
48
72
func (p *PubSub) HandleSubscribe(w http.ResponseWriter, r *http.Request) {
49
73
topicName := r.URL.Query().Get("topic")
50
-
clientInTopic := p.getOrCreateTopic(topicName, r.Context().Value("REQUEST_ID").(string))
74
+
requestId := r.Context().Value("REQUEST_ID").(string)
75
+
clientInTopic := p.getOrCreateTopic(topicName, requestId)
51
76
52
77
w.Header().Set("Content-Type", "text/event-stream")
53
78
w.Header().Set("Connection", "keep-alive")
···
59
84
case <-r.Context().Done():
60
85
fmt.Println("Client disconnected")
61
86
62
-
var index int
63
-
for i, client := range p.topics[topicName] {
64
-
if client.id == clientInTopic.id {
65
-
index = i
66
-
break
67
-
}
68
-
}
69
-
70
-
p.topics[topicName] = slices.Delete(p.topics[topicName], index, index+1)
71
-
72
-
if len(p.topics[topicName]) == 0 {
73
-
delete(p.topics, topicName)
74
-
}
87
+
p.clientDisconnect(topicName, requestId)
75
88
76
89
return
77
90
case data := <-clientInTopic.channel: