An open source supporter broker powered by high-fives. high-five.atprotofans.com/

feature: adding buffer to improve quick reconnect handling

+100 -26
+100 -26
internal/websocket/hub.go
··· 10 10 ) 11 11 12 12 const ( 13 - writeWait = 10 * time.Second 14 - pongWait = 60 * time.Second 15 - pingPeriod = (pongWait * 9) / 10 16 - maxMessageSize = 4096 17 - sessionTimeout = 25 * time.Minute // Disconnect after 25 minutes 13 + writeWait = 10 * time.Second 14 + pongWait = 60 * time.Second 15 + pingPeriod = (pongWait * 9) / 10 16 + maxMessageSize = 4096 17 + sessionTimeout = 25 * time.Minute // Disconnect after 25 minutes 18 + leaveDebounceDelay = 10 * time.Second // Wait before broadcasting leave 18 19 ) 19 20 20 21 // MessageType represents the type of websocket message. ··· 58 59 sessionTimer *time.Timer 59 60 } 60 61 62 + // pendingLeave tracks a client that is about to leave with a debounce timer. 63 + type pendingLeave struct { 64 + client *Client 65 + timer *time.Timer 66 + } 67 + 61 68 // Hub maintains the set of active clients and broadcasts messages. 62 69 type Hub struct { 63 - clients map[*Client]bool 64 - didClients map[string]*Client // Map DID to client 65 - broadcast chan []byte 66 - register chan *Client 67 - unregister chan *Client 68 - mu sync.RWMutex 70 + clients map[*Client]bool 71 + didClients map[string]*Client // Map DID to client 72 + pendingLeaves map[string]*pendingLeave // Map DID to pending leave 73 + broadcast chan []byte 74 + register chan *Client 75 + unregister chan *Client 76 + mu sync.RWMutex 69 77 70 78 // Callbacks for handling messages 71 79 onClientJoin func(client *Client) error ··· 76 84 // NewHub creates a new Hub. 77 85 func NewHub() *Hub { 78 86 return &Hub{ 79 - clients: make(map[*Client]bool), 80 - didClients: make(map[string]*Client), 81 - broadcast: make(chan []byte), 82 - register: make(chan *Client), 83 - unregister: make(chan *Client), 87 + clients: make(map[*Client]bool), 88 + didClients: make(map[string]*Client), 89 + pendingLeaves: make(map[string]*pendingLeave), 90 + broadcast: make(chan []byte), 91 + register: make(chan *Client), 92 + unregister: make(chan *Client), 84 93 } 85 94 } 86 95 ··· 101 110 select { 102 111 case client := <-h.register: 103 112 h.mu.Lock() 113 + // Check if there's a pending leave for this DID (reconnect scenario) 114 + cancelledPendingLeave := false 115 + if client.DID != "" { 116 + if pending, ok := h.pendingLeaves[client.DID]; ok { 117 + // Cancel the pending leave timer 118 + pending.timer.Stop() 119 + delete(h.pendingLeaves, client.DID) 120 + cancelledPendingLeave = true 121 + log.Printf("Client reconnected, cancelled pending leave: %s (%s)", client.DID, client.Handle) 122 + 123 + // Close the old client's send channel if it's different 124 + if pending.client != client && pending.client.send != nil { 125 + close(pending.client.send) 126 + } 127 + } 128 + } 129 + 104 130 h.clients[client] = true 105 131 if client.DID != "" { 106 132 h.didClients[client.DID] = client ··· 108 134 h.mu.Unlock() 109 135 log.Printf("Client registered: %s (%s)", client.DID, client.Handle) 110 136 111 - // Call the join callback 137 + // If we cancelled a pending leave, mark as reconnect so join callback 138 + // doesn't broadcast to other clients (they never saw the user leave) 139 + if cancelledPendingLeave { 140 + client.IsReconnect = true 141 + } 142 + 112 143 if h.onClientJoin != nil { 113 144 if err := h.onClientJoin(client); err != nil { 114 145 log.Printf("error in client join callback: %v", err) ··· 119 150 h.mu.Lock() 120 151 if _, ok := h.clients[client]; ok { 121 152 delete(h.clients, client) 153 + 122 154 if client.DID != "" { 155 + // Don't immediately remove from didClients or call leave callback 156 + // Instead, start a debounce timer 123 157 delete(h.didClients, client.DID) 158 + 159 + // Create pending leave with timer 160 + pending := &pendingLeave{ 161 + client: client, 162 + } 163 + pending.timer = time.AfterFunc(leaveDebounceDelay, func() { 164 + h.completePendingLeave(client.DID) 165 + }) 166 + h.pendingLeaves[client.DID] = pending 167 + log.Printf("Client disconnected, starting leave debounce: %s (%s)", client.DID, client.Handle) 168 + } else { 169 + // No DID, close immediately 170 + close(client.send) 171 + h.mu.Unlock() 172 + if h.onClientLeave != nil { 173 + if err := h.onClientLeave(client); err != nil { 174 + log.Printf("error in client leave callback: %v", err) 175 + } 176 + } 177 + continue 124 178 } 125 - close(client.send) 126 179 } 127 180 h.mu.Unlock() 128 - log.Printf("Client unregistered: %s (%s)", client.DID, client.Handle) 129 - 130 - // Call the leave callback 131 - if h.onClientLeave != nil { 132 - if err := h.onClientLeave(client); err != nil { 133 - log.Printf("error in client leave callback: %v", err) 134 - } 135 - } 136 181 137 182 case message := <-h.broadcast: 138 183 h.mu.RLock() ··· 152 197 } 153 198 } 154 199 h.mu.RUnlock() 200 + } 201 + } 202 + } 203 + 204 + // completePendingLeave is called after the debounce delay to finalize a client leave. 205 + func (h *Hub) completePendingLeave(did string) { 206 + h.mu.Lock() 207 + pending, ok := h.pendingLeaves[did] 208 + if !ok { 209 + // Already cancelled (client reconnected) 210 + h.mu.Unlock() 211 + return 212 + } 213 + 214 + delete(h.pendingLeaves, did) 215 + client := pending.client 216 + 217 + // Close the send channel 218 + if client.send != nil { 219 + close(client.send) 220 + } 221 + h.mu.Unlock() 222 + 223 + log.Printf("Client leave confirmed after debounce: %s (%s)", client.DID, client.Handle) 224 + 225 + // Call the leave callback 226 + if h.onClientLeave != nil { 227 + if err := h.onClientLeave(client); err != nil { 228 + log.Printf("error in client leave callback: %v", err) 155 229 } 156 230 } 157 231 }