A feed generator that allows Bluesky bookmarks via DMs
at main 12 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "os" 12 "strings" 13 "time" 14 15 "github.com/pkg/errors" 16) 17 18const ( 19 httpClientTimeoutDuration = time.Second * 5 20 transportIdleConnTimeoutDuration = time.Second * 90 21 baseBskyURL = "https://bsky.social/xrpc" 22) 23 24type auth struct { 25 AccessJwt string `json:"accessJwt"` 26 RefershJWT string `json:"refreshJwt"` 27 Did string `json:"did"` 28} 29 30type accessData struct { 31 handle string 32 appPassword string 33} 34 35type ListConvosResponse struct { 36 Cursor string `json:"cursor"` 37 Convos []Convo `json:"convos"` 38} 39 40type Convo struct { 41 ID string `json:"id"` 42 Members []ConvoMember `json:"members"` 43 UnreadCount int `json:"unreadCount"` 44} 45 46type ConvoMember struct { 47 Did string `json:"did"` 48 Handle string `json:"handle"` 49} 50 51type ErrorResponse struct { 52 Error string `json:"error"` 53} 54 55type MessageResp struct { 56 Messages []Message `json:"messages"` 57 Cursor string `json:"cursor"` 58} 59 60type Message struct { 61 ID string `json:"id"` 62 Sender MessageSender `json:"sender"` 63 Text string `json:"text"` 64 Embed MessageEmbed `json:"embed"` 65} 66 67type MessageEmbed struct { 68 Record MessageEmbedRecord `json:"record"` 69} 70 71type MessageEmbedRecord struct { 72 URI string `json:"uri"` 73 Author MessageEmbedRecordAuthor `json:"author"` 74 Value MessageEmbedPost `json:"value"` 75} 76 77type MessageEmbedRecordAuthor struct { 78 Did string `json:"did"` 79 Handle string `json:"handle"` 80} 81 82type MessageEmbedPost struct { 83 Text string `json:"text"` 84} 85 86type MessageSender struct { 87 Did string `json:"did"` 88} 89 90type UpdateMessageReadRequest struct { 91 ConvoID string `json:"convoId"` 92 MessageID string `json:"messageId"` 93} 94 95type DmService struct { 96 httpClient *http.Client 97 accessData accessData 98 auth auth 99 timerDuration time.Duration 100 pdsURL string 101 bookmarkStore BookmarkStore 102} 103 104func NewDmService(bookmarkStore BookmarkStore, timerDuration time.Duration) (*DmService, error) { 105 httpClient := http.Client{ 106 Timeout: httpClientTimeoutDuration, 107 Transport: &http.Transport{ 108 IdleConnTimeout: transportIdleConnTimeoutDuration, 109 }, 110 } 111 112 accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE") 113 accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD") 114 pdsURL := os.Getenv("MESSAGING_PDS_URL") 115 116 service := DmService{ 117 httpClient: &httpClient, 118 accessData: accessData{ 119 handle: accessHandle, 120 appPassword: accessAppPassword, 121 }, 122 timerDuration: timerDuration, 123 pdsURL: pdsURL, 124 bookmarkStore: bookmarkStore, 125 } 126 127 auth, err := service.Authenicate() 128 if err != nil { 129 return nil, fmt.Errorf("authenticating: %w", err) 130 } 131 132 service.auth = auth 133 134 return &service, nil 135} 136 137func (d *DmService) Start(ctx context.Context) { 138 go d.RefreshTask(ctx) 139 140 timer := time.NewTimer(d.timerDuration) 141 defer timer.Stop() 142 143 for { 144 select { 145 case <-ctx.Done(): 146 slog.Warn("context canceled - stopping dm task") 147 return 148 case <-timer.C: 149 err := d.HandleMessageTimer(ctx) 150 if err != nil { 151 slog.Error("handle message timer", "error", err) 152 } 153 timer.Reset(d.timerDuration) 154 } 155 } 156} 157 158func (d *DmService) HandleMessageTimer(ctx context.Context) error { 159 convoResp, err := d.GetUnreadMessages() 160 if err != nil { 161 return fmt.Errorf("get unread messages: %w", err) 162 } 163 164 // TODO: handle the cursor pagination 165 166 for _, convo := range convoResp.Convos { 167 if convo.UnreadCount == 0 { 168 continue 169 } 170 171 messageResp, err := d.GetMessages(ctx, convo.ID) 172 if err != nil { 173 slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID) 174 continue 175 } 176 177 unreadCount := convo.UnreadCount 178 unreadMessages := make([]Message, 0, convo.UnreadCount) 179 // TODO: handle cursor pagination 180 for _, msg := range messageResp.Messages { 181 // TODO: techincally if I get to a message that's from the bot account, then there shouldn't be 182 // an more unread messages? 183 if msg.Sender.Did == d.auth.Did { 184 continue 185 } 186 187 unreadMessages = append(unreadMessages, msg) 188 unreadCount-- 189 if unreadCount == 0 { 190 break 191 } 192 } 193 194 for _, msg := range unreadMessages { 195 d.handleMessage(msg) 196 197 err = d.MarkMessageRead(msg.ID, convo.ID) 198 if err != nil { 199 slog.Error("marking message read", "error", err) 200 continue 201 } 202 } 203 } 204 205 return nil 206} 207 208func (d *DmService) handleMessage(msg Message) { 209 // for now, ignore messages that don't have linked posts in them 210 if msg.Embed.Record.URI == "" { 211 return 212 } 213 214 rkey := getRKeyFromATURI(msg.Embed.Record.URI) 215 msgAction := strings.ToLower(msg.Text) 216 217 var err error 218 219 switch { 220 case strings.Contains(msgAction, "delete"): 221 err = d.handleDeleteBookmark(msg) 222 default: 223 err = d.handleCreateBookmark(msg) 224 } 225 226 if err != nil { 227 // TODO: perhaps continue here so that we don't mark the message as read so it can be tried again? Or perhaps send a message 228 // too the user? 229 slog.Error("failed to handle bookmark message", "error", err, "rkey", rkey, "sender", msg.Sender.Did) 230 } 231} 232 233func (d *DmService) handleCreateBookmark(msg Message) error { 234 content := msg.Embed.Record.Value.Text 235 if content == "" { 236 content = "post contained no text" 237 } 238 if len(content) > 75 { 239 content = fmt.Sprintf("%s...", content[:75]) 240 } 241 242 publicURI := getPublicPostURIFromATURI(msg.Embed.Record.URI, msg.Embed.Record.Author.Handle) 243 244 rkey := getRKeyFromATURI(msg.Embed.Record.URI) 245 246 err := d.bookmarkStore.CreateBookmark(rkey, publicURI, msg.Embed.Record.URI, msg.Embed.Record.Author.Did, msg.Embed.Record.Author.Handle, msg.Sender.Did, content, time.Now().UnixMilli()) 247 if err != nil { 248 return fmt.Errorf("creating bookmark: %w", err) 249 } 250 return nil 251} 252 253func (d *DmService) handleDeleteBookmark(msg Message) error { 254 rkey := getRKeyFromATURI(msg.Embed.Record.URI) 255 256 err := d.bookmarkStore.DeleteRepliedPostsForBookmarkedPostURIandUserDID(msg.Embed.Record.URI, msg.Sender.Did) 257 if err != nil { 258 return fmt.Errorf("failed to delete replied posts for bookmark for user: %w", err) 259 } 260 261 err = d.bookmarkStore.DeleteBookmark(rkey, msg.Sender.Did) 262 if err != nil { 263 return fmt.Errorf("failed to delete bookmark: %w", err) 264 } 265 266 return nil 267} 268 269func getPublicPostURIFromATURI(atURI, authorHandle string) string { 270 atSplit := strings.Split(atURI, "app.bsky.feed.post") 271 if len(atSplit) < 2 { 272 slog.Error("can't get public post URI from AT uri", "at uri", atURI) 273 return "" 274 } 275 return fmt.Sprintf("https://bsky.app/profile/%s/post%s", authorHandle, atSplit[1]) 276} 277 278func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) { 279 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL) 280 request, err := http.NewRequest("GET", url, nil) 281 if err != nil { 282 return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err) 283 } 284 285 request.Header.Add("Content-Type", "application/json") 286 request.Header.Add("Accept", "application/json") 287 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 288 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 289 290 resp, err := d.httpClient.Do(request) 291 if err != nil { 292 return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err) 293 } 294 defer resp.Body.Close() 295 296 if resp.StatusCode != http.StatusOK { 297 var errorResp ErrorResponse 298 err = decodeResp(resp.Body, &errorResp) 299 if err != nil { 300 return ListConvosResponse{}, err 301 } 302 303 return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 304 } 305 306 var listConvoResp ListConvosResponse 307 err = decodeResp(resp.Body, &listConvoResp) 308 if err != nil { 309 return ListConvosResponse{}, err 310 } 311 312 return listConvoResp, nil 313} 314 315func (d *DmService) MarkMessageRead(messageID, convoID string) error { 316 bodyReq := UpdateMessageReadRequest{ 317 ConvoID: convoID, 318 MessageID: messageID, 319 } 320 321 bodyB, err := json.Marshal(bodyReq) 322 if err != nil { 323 return fmt.Errorf("marshal update message request body: %w", err) 324 } 325 326 r := bytes.NewReader(bodyB) 327 328 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL) 329 request, err := http.NewRequest("POST", url, r) 330 if err != nil { 331 return fmt.Errorf("create new list convos http request: %w", err) 332 } 333 334 request.Header.Add("Content-Type", "application/json") 335 request.Header.Add("Accept", "application/json") 336 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 337 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 338 339 resp, err := d.httpClient.Do(request) 340 if err != nil { 341 return fmt.Errorf("do http request to update message read: %w", err) 342 } 343 defer resp.Body.Close() 344 345 if resp.StatusCode == http.StatusOK { 346 return nil 347 } 348 349 var errorResp ErrorResponse 350 err = decodeResp(resp.Body, &errorResp) 351 if err != nil { 352 return err 353 } 354 355 return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 356 357} 358 359func (d *DmService) Authenicate() (auth, error) { 360 url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL) 361 362 requestData := map[string]interface{}{ 363 "identifier": d.accessData.handle, 364 "password": d.accessData.appPassword, 365 } 366 367 data, err := json.Marshal(requestData) 368 if err != nil { 369 return auth{}, errors.Wrap(err, "failed to marshal request") 370 } 371 372 r := bytes.NewReader(data) 373 374 request, err := http.NewRequest("POST", url, r) 375 if err != nil { 376 return auth{}, errors.Wrap(err, "failed to create request") 377 } 378 379 request.Header.Add("Content-Type", "application/json") 380 381 resp, err := d.httpClient.Do(request) 382 if err != nil { 383 return auth{}, errors.Wrap(err, "failed to make request") 384 } 385 defer resp.Body.Close() 386 387 var loginResp auth 388 err = decodeResp(resp.Body, &loginResp) 389 if err != nil { 390 return auth{}, err 391 } 392 393 return loginResp, nil 394} 395 396func (d *DmService) RefreshTask(ctx context.Context) { 397 timer := time.NewTimer(time.Hour) 398 defer timer.Stop() 399 400 for { 401 select { 402 case <-ctx.Done(): 403 return 404 case <-timer.C: 405 err := d.RefreshAuthenication(ctx) 406 if err != nil { 407 slog.Error("handle refresh auth timer", "error", err) 408 // TODO: better retry with backoff probably 409 timer.Reset(time.Minute) 410 continue 411 } 412 timer.Reset(time.Hour) 413 } 414 } 415} 416 417func (d *DmService) RefreshAuthenication(ctx context.Context) error { 418 url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL) 419 420 request, err := http.NewRequest("POST", url, nil) 421 if err != nil { 422 return errors.Wrap(err, "failed to create request") 423 } 424 425 request.Header.Add("Content-Type", "application/json") 426 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT)) 427 428 resp, err := d.httpClient.Do(request) 429 if err != nil { 430 return errors.Wrap(err, "failed to make request") 431 } 432 defer resp.Body.Close() 433 434 var loginResp auth 435 err = decodeResp(resp.Body, &loginResp) 436 if err != nil { 437 return err 438 } 439 440 d.auth = loginResp 441 442 return nil 443} 444 445func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) { 446 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID) 447 request, err := http.NewRequest("GET", url, nil) 448 if err != nil { 449 return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err) 450 } 451 452 request.Header.Add("Content-Type", "application/json") 453 request.Header.Add("Accept", "application/json") 454 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 455 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 456 457 resp, err := d.httpClient.Do(request) 458 if err != nil { 459 return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err) 460 } 461 defer resp.Body.Close() 462 463 if resp.StatusCode != http.StatusOK { 464 var errorResp ErrorResponse 465 err = decodeResp(resp.Body, &errorResp) 466 if err != nil { 467 return MessageResp{}, err 468 } 469 470 return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 471 } 472 473 var messageResp MessageResp 474 err = decodeResp(resp.Body, &messageResp) 475 if err != nil { 476 return MessageResp{}, err 477 } 478 479 return messageResp, nil 480} 481 482func decodeResp(body io.Reader, result any) error { 483 resBody, err := io.ReadAll(body) 484 if err != nil { 485 return errors.Wrap(err, "failed to read response") 486 } 487 488 err = json.Unmarshal(resBody, result) 489 if err != nil { 490 return errors.Wrap(err, "failed to unmarshal response") 491 } 492 return nil 493}