Rough draft for working out who to send the alert to and allowing users to subscribe via dm #3

closed
opened by willdot.net targeting main from send-alerts
+28
cmd/main.go
··· 12 12 "os/signal" 13 13 "path" 14 14 "syscall" 15 + "time" 15 16 16 17 tangledalertbot "tangled.sh/willdot.net/tangled-alert-bot" 17 18 ··· 56 57 } 57 58 defer database.Close() 58 59 60 + dmService, err := tangledalertbot.NewDmService(database, time.Second*30) 61 + if err != nil { 62 + return fmt.Errorf("create dm service: %w", err) 63 + } 64 + 59 65 ctx, cancel := context.WithCancel(context.Background()) 60 66 defer cancel() 61 67 ··· 63 69 64 70 go startHttpServer(ctx, database) 65 71 72 + go dmService.Start(ctx) 73 + 66 74 <-signals 67 75 cancel() 68 76 ··· 101 109 mux := http.NewServeMux() 102 110 mux.HandleFunc("/issues", srv.handleListIssues) 103 111 mux.HandleFunc("/comments", srv.handleListComments) 112 + mux.HandleFunc("/users", srv.handleListUsers) 104 113 105 114 err := http.ListenAndServe(":3000", mux) 106 115 if err != nil { ··· 149 158 w.Header().Set("Content-Type", "application/json") 150 159 w.Write(b) 151 160 } 161 + 162 + func (s *server) handleListUsers(w http.ResponseWriter, r *http.Request) { 163 + users, err := s.db.GetUsers() 164 + if err != nil { 165 + slog.Error("getting users from DB", "error", err) 166 + http.Error(w, "error getting users from DB", http.StatusInternalServerError) 167 + return 168 + } 169 + 170 + b, err := json.Marshal(users) 171 + if err != nil { 172 + slog.Error("marshalling users from DB", "error", err) 173 + http.Error(w, "marshalling users from DB", http.StatusInternalServerError) 174 + return 175 + } 176 + 177 + w.Header().Set("Content-Type", "application/json") 178 + w.Write(b) 179 + }
+49 -1
consumer.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "strings" 6 7 7 8 "fmt" 8 9 "log/slog" ··· 38 39 DeleteIssue(did, rkey string) error 39 40 DeleteComment(did, rkey string) error 40 41 DeleteCommentsForIssue(issueURI string) error 42 + GetUser(did string) (User, error) 43 + CreateUser(user User) error 41 44 } 42 45 43 46 // JetstreamConsumer is responsible for consuming from a jetstream instance ··· 217 220 } 218 221 219 222 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to 223 + didToNotify := getUserToAlert(comment) 224 + if didToNotify == "" { 225 + slog.Info("could not work out did to send alert to", "comment", comment) 226 + return 227 + } 228 + 229 + user, err := h.store.GetUser(didToNotify) 230 + if err != nil { 231 + slog.Error("getting user to send alert to", "error", err, "did", didToNotify) 232 + return 233 + } 220 234 221 - slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey) 235 + slog.Info("sending alert to user", "value", comment, "did", didToNotify, "convo", user.ConvoID) 222 236 } 223 237 224 238 func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) { ··· 259 273 260 274 slog.Info("deleted comment ", "did", did, "rkey", rkey) 261 275 } 276 + 277 + // at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.issue.comment/3lzkp4va62m22 278 + func getUserToAlert(comment tangled.RepoIssueComment) string { 279 + if comment.ReplyTo != nil { 280 + return getDidFromCommentURI(*comment.ReplyTo) 281 + } 282 + return getDidFromIssueURI(comment.Issue) 283 + } 284 + 285 + func getDidFromCommentURI(uri string) string { 286 + split := strings.Split(uri, tangled.RepoIssueCommentNSID) 287 + if len(split) != 2 { 288 + slog.Error("invalid comment URI received", "uri", uri) 289 + return "" 290 + } 291 + 292 + did := strings.TrimPrefix(split[0], "at://") 293 + did = strings.TrimSuffix(did, "/") 294 + 295 + return did 296 + } 297 + 298 + func getDidFromIssueURI(uri string) string { 299 + split := strings.Split(uri, tangled.RepoIssueNSID) 300 + if len(split) != 2 { 301 + slog.Error("invalid issue URI received", "uri", uri) 302 + return "" 303 + } 304 + 305 + did := strings.TrimPrefix(split[0], "at://") 306 + did = strings.TrimSuffix(did, "/") 307 + 308 + return did 309 + }
+86
database.go
··· 44 44 return nil, fmt.Errorf("creating comments table: %w", err) 45 45 } 46 46 47 + err = createUsersTable(db) 48 + if err != nil { 49 + return nil, fmt.Errorf("creating users table: %w", err) 50 + } 51 + 47 52 return &Database{db: db}, nil 48 53 } 49 54 ··· 122 127 return nil 123 128 } 124 129 130 + func createUsersTable(db *sql.DB) error { 131 + createTableSQL := `CREATE TABLE IF NOT EXISTS users ( 132 + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, 133 + "did" TEXT, 134 + "handle" TEXT, 135 + "convoId" TEXT, 136 + "createdAt" integer NOT NULL, 137 + UNIQUE(did) 138 + );` 139 + 140 + slog.Info("Create users table...") 141 + statement, err := db.Prepare(createTableSQL) 142 + if err != nil { 143 + return fmt.Errorf("prepare DB statement to create users table: %w", err) 144 + } 145 + _, err = statement.Exec() 146 + if err != nil { 147 + return fmt.Errorf("exec sql statement to create users table: %w", err) 148 + } 149 + slog.Info("users table created") 150 + 151 + return nil 152 + } 153 + 125 154 // CreateIssue will insert a issue into a database 126 155 func (d *Database) CreateIssue(issue Issue) error { 127 156 sql := `REPLACE INTO issues (authorDid, rkey, title, body, repo, createdAt) VALUES (?, ?, ?, ?, ?, ?);` ··· 142 171 return nil 143 172 } 144 173 174 + // CreateUser will insert a user into a database 175 + func (d *Database) CreateUser(user User) error { 176 + sql := `REPLACE INTO users (did, handle, convoId, createdAt) VALUES (?, ?, ?, ?);` 177 + _, err := d.db.Exec(sql, user.DID, user.Handle, user.ConvoID, user.CreatedAt) 178 + if err != nil { 179 + return fmt.Errorf("exec insert user: %w", err) 180 + } 181 + return nil 182 + } 183 + 145 184 func (d *Database) GetIssues() ([]Issue, error) { 146 185 sql := "SELECT authorDid, rkey, title, body, repo, createdAt FROM issues;" 147 186 rows, err := d.db.Query(sql) ··· 182 221 return results, nil 183 222 } 184 223 224 + func (d *Database) GetUser(did string) (User, error) { 225 + sql := "SELECT did, handle, convoId, createdAt FROM users WHERE did = ?;" 226 + rows, err := d.db.Query(sql, did) 227 + if err != nil { 228 + return User{}, fmt.Errorf("run query to get user: %w", err) 229 + } 230 + defer rows.Close() 231 + 232 + for rows.Next() { 233 + var user User 234 + if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil { 235 + return User{}, fmt.Errorf("scan row: %w", err) 236 + } 237 + 238 + return user, nil 239 + } 240 + return User{}, fmt.Errorf("user not found") 241 + } 242 + 243 + func (d *Database) GetUsers() ([]User, error) { 244 + sql := "SELECT did, handle, convoId, createdAt FROM users;" 245 + rows, err := d.db.Query(sql) 246 + if err != nil { 247 + return nil, fmt.Errorf("run query to get user: %w", err) 248 + } 249 + defer rows.Close() 250 + 251 + var results []User 252 + for rows.Next() { 253 + var user User 254 + if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil { 255 + return nil, fmt.Errorf("scan row: %w", err) 256 + } 257 + results = append(results, user) 258 + } 259 + return results, nil 260 + } 261 + 185 262 func (d *Database) DeleteIssue(did, rkey string) error { 186 263 sql := "DELETE FROM issues WHERE authorDid = ? AND rkey = ?;" 187 264 _, err := d.db.Exec(sql, did, rkey) ··· 208 285 } 209 286 return nil 210 287 } 288 + 289 + func (d *Database) DeleteUser(did string) error { 290 + sql := "DELETE FROM users WHERE did = ?;" 291 + _, err := d.db.Exec(sql, did) 292 + if err != nil { 293 + return fmt.Errorf("exec delete user") 294 + } 295 + return nil 296 + }
+441
dm_handler.go
··· 1 + package tangledalertbot 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "os" 12 + "strings" 13 + "time" 14 + 15 + "github.com/pkg/errors" 16 + ) 17 + 18 + const ( 19 + httpClientTimeoutDuration = time.Second * 5 20 + transportIdleConnTimeoutDuration = time.Second * 90 21 + baseBskyURL = "https://bsky.social/xrpc" 22 + ) 23 + 24 + type auth struct { 25 + AccessJwt string `json:"accessJwt"` 26 + RefershJWT string `json:"refreshJwt"` 27 + Did string `json:"did"` 28 + } 29 + 30 + type accessData struct { 31 + handle string 32 + appPassword string 33 + } 34 + 35 + type ListConvosResponse struct { 36 + Cursor string `json:"cursor"` 37 + Convos []Convo `json:"convos"` 38 + } 39 + 40 + type Convo struct { 41 + ID string `json:"id"` 42 + Members []ConvoMember `json:"members"` 43 + UnreadCount int `json:"unreadCount"` 44 + } 45 + 46 + type ConvoMember struct { 47 + Did string `json:"did"` 48 + Handle string `json:"handle"` 49 + } 50 + 51 + type ErrorResponse struct { 52 + Error string `json:"error"` 53 + } 54 + 55 + type MessageResp struct { 56 + Messages []Message `json:"messages"` 57 + Cursor string `json:"cursor"` 58 + } 59 + 60 + type Message struct { 61 + ID string `json:"id"` 62 + Sender MessageSender `json:"sender"` 63 + Text string `json:"text"` 64 + } 65 + 66 + type MessageSender struct { 67 + Did string `json:"did"` 68 + } 69 + 70 + type UpdateMessageReadRequest struct { 71 + ConvoID string `json:"convoId"` 72 + MessageID string `json:"messageId"` 73 + } 74 + 75 + type User struct { 76 + DID string 77 + Handle string 78 + ConvoID string 79 + CreatedAt int 80 + } 81 + 82 + type DmService struct { 83 + httpClient *http.Client 84 + accessData accessData 85 + auth auth 86 + timerDuration time.Duration 87 + pdsURL string 88 + store Store 89 + } 90 + 91 + func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) { 92 + httpClient := http.Client{ 93 + Timeout: httpClientTimeoutDuration, 94 + Transport: &http.Transport{ 95 + IdleConnTimeout: transportIdleConnTimeoutDuration, 96 + }, 97 + } 98 + 99 + accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE") 100 + accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD") 101 + pdsURL := os.Getenv("MESSAGING_PDS_URL") 102 + 103 + service := DmService{ 104 + httpClient: &httpClient, 105 + accessData: accessData{ 106 + handle: accessHandle, 107 + appPassword: accessAppPassword, 108 + }, 109 + timerDuration: timerDuration, 110 + pdsURL: pdsURL, 111 + store: store, 112 + } 113 + 114 + auth, err := service.Authenicate() 115 + if err != nil { 116 + return nil, fmt.Errorf("authenticating: %w", err) 117 + } 118 + 119 + service.auth = auth 120 + 121 + return &service, nil 122 + } 123 + 124 + func (d *DmService) Start(ctx context.Context) { 125 + go d.RefreshTask(ctx) 126 + 127 + timer := time.NewTimer(d.timerDuration) 128 + defer timer.Stop() 129 + 130 + for { 131 + select { 132 + case <-ctx.Done(): 133 + slog.Warn("context canceled - stopping dm task") 134 + return 135 + case <-timer.C: 136 + err := d.HandleMessageTimer(ctx) 137 + if err != nil { 138 + slog.Error("handle message timer", "error", err) 139 + } 140 + timer.Reset(d.timerDuration) 141 + } 142 + } 143 + } 144 + 145 + func (d *DmService) RefreshTask(ctx context.Context) { 146 + timer := time.NewTimer(time.Hour) 147 + defer timer.Stop() 148 + 149 + for { 150 + select { 151 + case <-ctx.Done(): 152 + return 153 + case <-timer.C: 154 + err := d.RefreshAuthenication(ctx) 155 + if err != nil { 156 + slog.Error("handle refresh auth timer", "error", err) 157 + // TODO: better retry with backoff probably 158 + timer.Reset(time.Minute) 159 + continue 160 + } 161 + timer.Reset(time.Hour) 162 + } 163 + } 164 + } 165 + 166 + func (d *DmService) HandleMessageTimer(ctx context.Context) error { 167 + convoResp, err := d.GetUnreadMessages() 168 + if err != nil { 169 + return fmt.Errorf("get unread messages: %w", err) 170 + } 171 + 172 + // TODO: handle the cursor pagination 173 + 174 + for _, convo := range convoResp.Convos { 175 + if convo.UnreadCount == 0 { 176 + continue 177 + } 178 + 179 + messageResp, err := d.GetMessages(ctx, convo.ID) 180 + if err != nil { 181 + slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID) 182 + continue 183 + } 184 + 185 + unreadCount := convo.UnreadCount 186 + unreadMessages := make([]Message, 0, convo.UnreadCount) 187 + // TODO: handle cursor pagination 188 + for _, msg := range messageResp.Messages { 189 + // TODO: techincally if I get to a message that's from the bot account, then there shouldn't be 190 + // an more unread messages? 191 + if msg.Sender.Did == d.auth.Did { 192 + continue 193 + } 194 + 195 + unreadMessages = append(unreadMessages, msg) 196 + unreadCount-- 197 + if unreadCount == 0 { 198 + break 199 + } 200 + } 201 + 202 + for _, msg := range unreadMessages { 203 + d.handleMessage(msg, convo) 204 + 205 + err = d.MarkMessageRead(msg.ID, convo.ID) 206 + if err != nil { 207 + slog.Error("marking message read", "error", err) 208 + continue 209 + } 210 + } 211 + } 212 + 213 + return nil 214 + } 215 + 216 + func (d *DmService) handleMessage(msg Message, convo Convo) { 217 + // TODO: add or remote user the list of "subsribed" users 218 + if strings.ToLower(msg.Text) == "subscribe" { 219 + userHandle := "" 220 + for _, member := range convo.Members { 221 + if member.Did == msg.Sender.Did { 222 + userHandle = member.Handle 223 + break 224 + } 225 + } 226 + 227 + if userHandle == "" { 228 + slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members) 229 + return 230 + } 231 + 232 + user := User{ 233 + DID: msg.Sender.Did, 234 + ConvoID: convo.ID, 235 + Handle: userHandle, 236 + CreatedAt: int(time.Now().UnixMilli()), 237 + } 238 + 239 + err := d.store.CreateUser(user) 240 + if err != nil { 241 + slog.Error("error creating user", "error", err, "user", user) 242 + return 243 + } 244 + } 245 + } 246 + 247 + func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) { 248 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL) 249 + request, err := http.NewRequest("GET", url, nil) 250 + if err != nil { 251 + return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err) 252 + } 253 + 254 + request.Header.Add("Content-Type", "application/json") 255 + request.Header.Add("Accept", "application/json") 256 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 257 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 258 + 259 + resp, err := d.httpClient.Do(request) 260 + if err != nil { 261 + return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err) 262 + } 263 + defer resp.Body.Close() 264 + 265 + if resp.StatusCode != http.StatusOK { 266 + var errorResp ErrorResponse 267 + err = decodeResp(resp.Body, &errorResp) 268 + if err != nil { 269 + return ListConvosResponse{}, err 270 + } 271 + 272 + return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 273 + } 274 + 275 + var listConvoResp ListConvosResponse 276 + err = decodeResp(resp.Body, &listConvoResp) 277 + if err != nil { 278 + return ListConvosResponse{}, err 279 + } 280 + 281 + return listConvoResp, nil 282 + } 283 + 284 + func (d *DmService) MarkMessageRead(messageID, convoID string) error { 285 + bodyReq := UpdateMessageReadRequest{ 286 + ConvoID: convoID, 287 + MessageID: messageID, 288 + } 289 + 290 + bodyB, err := json.Marshal(bodyReq) 291 + if err != nil { 292 + return fmt.Errorf("marshal update message request body: %w", err) 293 + } 294 + 295 + r := bytes.NewReader(bodyB) 296 + 297 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL) 298 + request, err := http.NewRequest("POST", url, r) 299 + if err != nil { 300 + return fmt.Errorf("create new list convos http request: %w", err) 301 + } 302 + 303 + request.Header.Add("Content-Type", "application/json") 304 + request.Header.Add("Accept", "application/json") 305 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 306 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 307 + 308 + resp, err := d.httpClient.Do(request) 309 + if err != nil { 310 + return fmt.Errorf("do http request to update message read: %w", err) 311 + } 312 + defer resp.Body.Close() 313 + 314 + if resp.StatusCode == http.StatusOK { 315 + return nil 316 + } 317 + 318 + var errorResp ErrorResponse 319 + err = decodeResp(resp.Body, &errorResp) 320 + if err != nil { 321 + return err 322 + } 323 + 324 + return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 325 + 326 + } 327 + 328 + func (d *DmService) Authenicate() (auth, error) { 329 + url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL) 330 + 331 + requestData := map[string]interface{}{ 332 + "identifier": d.accessData.handle, 333 + "password": d.accessData.appPassword, 334 + } 335 + 336 + data, err := json.Marshal(requestData) 337 + if err != nil { 338 + return auth{}, errors.Wrap(err, "failed to marshal request") 339 + } 340 + 341 + r := bytes.NewReader(data) 342 + 343 + request, err := http.NewRequest("POST", url, r) 344 + if err != nil { 345 + return auth{}, errors.Wrap(err, "failed to create request") 346 + } 347 + 348 + request.Header.Add("Content-Type", "application/json") 349 + 350 + resp, err := d.httpClient.Do(request) 351 + if err != nil { 352 + return auth{}, errors.Wrap(err, "failed to make request") 353 + } 354 + defer resp.Body.Close() 355 + 356 + var loginResp auth 357 + err = decodeResp(resp.Body, &loginResp) 358 + if err != nil { 359 + return auth{}, err 360 + } 361 + 362 + return loginResp, nil 363 + } 364 + 365 + func (d *DmService) RefreshAuthenication(ctx context.Context) error { 366 + url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL) 367 + 368 + request, err := http.NewRequest("POST", url, nil) 369 + if err != nil { 370 + return errors.Wrap(err, "failed to create request") 371 + } 372 + 373 + request.Header.Add("Content-Type", "application/json") 374 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT)) 375 + 376 + resp, err := d.httpClient.Do(request) 377 + if err != nil { 378 + return errors.Wrap(err, "failed to make request") 379 + } 380 + defer resp.Body.Close() 381 + 382 + var loginResp auth 383 + err = decodeResp(resp.Body, &loginResp) 384 + if err != nil { 385 + return err 386 + } 387 + 388 + d.auth = loginResp 389 + 390 + return nil 391 + } 392 + 393 + func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) { 394 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID) 395 + request, err := http.NewRequest("GET", url, nil) 396 + if err != nil { 397 + return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err) 398 + } 399 + 400 + request.Header.Add("Content-Type", "application/json") 401 + request.Header.Add("Accept", "application/json") 402 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 403 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 404 + 405 + resp, err := d.httpClient.Do(request) 406 + if err != nil { 407 + return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err) 408 + } 409 + defer resp.Body.Close() 410 + 411 + if resp.StatusCode != http.StatusOK { 412 + var errorResp ErrorResponse 413 + err = decodeResp(resp.Body, &errorResp) 414 + if err != nil { 415 + return MessageResp{}, err 416 + } 417 + 418 + return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 419 + } 420 + 421 + var messageResp MessageResp 422 + err = decodeResp(resp.Body, &messageResp) 423 + if err != nil { 424 + return MessageResp{}, err 425 + } 426 + 427 + return messageResp, nil 428 + } 429 + 430 + func decodeResp(body io.Reader, result any) error { 431 + resBody, err := io.ReadAll(body) 432 + if err != nil { 433 + return errors.Wrap(err, "failed to read response") 434 + } 435 + 436 + err = json.Unmarshal(resBody, result) 437 + if err != nil { 438 + return errors.Wrap(err, "failed to unmarshal response") 439 + } 440 + return nil 441 + }