A feed generator that allows Bluesky bookmarks via DMs
1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "os"
12 "strings"
13 "time"
14
15 "github.com/pkg/errors"
16)
17
18const (
19 httpClientTimeoutDuration = time.Second * 5
20 transportIdleConnTimeoutDuration = time.Second * 90
21 baseBskyURL = "https://bsky.social/xrpc"
22)
23
24type auth struct {
25 AccessJwt string `json:"accessJwt"`
26 RefershJWT string `json:"refreshJwt"`
27 Did string `json:"did"`
28}
29
30type accessData struct {
31 handle string
32 appPassword string
33}
34
35type ListConvosResponse struct {
36 Cursor string `json:"cursor"`
37 Convos []Convo `json:"convos"`
38}
39
40type Convo struct {
41 ID string `json:"id"`
42 Members []ConvoMember `json:"members"`
43 UnreadCount int `json:"unreadCount"`
44}
45
46type ConvoMember struct {
47 Did string `json:"did"`
48 Handle string `json:"handle"`
49}
50
51type ErrorResponse struct {
52 Error string `json:"error"`
53}
54
55type MessageResp struct {
56 Messages []Message `json:"messages"`
57 Cursor string `json:"cursor"`
58}
59
60type Message struct {
61 ID string `json:"id"`
62 Sender MessageSender `json:"sender"`
63 Text string `json:"text"`
64 Embed MessageEmbed `json:"embed"`
65}
66
67type MessageEmbed struct {
68 Record MessageEmbedRecord `json:"record"`
69}
70
71type MessageEmbedRecord struct {
72 URI string `json:"uri"`
73 Author MessageEmbedRecordAuthor `json:"author"`
74 Value MessageEmbedPost `json:"value"`
75}
76
77type MessageEmbedRecordAuthor struct {
78 Did string `json:"did"`
79 Handle string `json:"handle"`
80}
81
82type MessageEmbedPost struct {
83 Text string `json:"text"`
84}
85
86type MessageSender struct {
87 Did string `json:"did"`
88}
89
90type UpdateMessageReadRequest struct {
91 ConvoID string `json:"convoId"`
92 MessageID string `json:"messageId"`
93}
94
95type DmService struct {
96 httpClient *http.Client
97 accessData accessData
98 auth auth
99 timerDuration time.Duration
100 pdsURL string
101 bookmarkStore BookmarkStore
102}
103
104func NewDmService(bookmarkStore BookmarkStore, timerDuration time.Duration) (*DmService, error) {
105 httpClient := http.Client{
106 Timeout: httpClientTimeoutDuration,
107 Transport: &http.Transport{
108 IdleConnTimeout: transportIdleConnTimeoutDuration,
109 },
110 }
111
112 accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE")
113 accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD")
114 pdsURL := os.Getenv("MESSAGING_PDS_URL")
115
116 service := DmService{
117 httpClient: &httpClient,
118 accessData: accessData{
119 handle: accessHandle,
120 appPassword: accessAppPassword,
121 },
122 timerDuration: timerDuration,
123 pdsURL: pdsURL,
124 bookmarkStore: bookmarkStore,
125 }
126
127 auth, err := service.Authenicate()
128 if err != nil {
129 return nil, fmt.Errorf("authenticating: %w", err)
130 }
131
132 service.auth = auth
133
134 return &service, nil
135}
136
137func (d *DmService) Start(ctx context.Context) {
138 go d.RefreshTask(ctx)
139
140 timer := time.NewTimer(d.timerDuration)
141 defer timer.Stop()
142
143 for {
144 select {
145 case <-ctx.Done():
146 slog.Warn("context canceled - stopping dm task")
147 return
148 case <-timer.C:
149 err := d.HandleMessageTimer(ctx)
150 if err != nil {
151 slog.Error("handle message timer", "error", err)
152 }
153 timer.Reset(d.timerDuration)
154 }
155 }
156}
157
158func (d *DmService) HandleMessageTimer(ctx context.Context) error {
159 convoResp, err := d.GetUnreadMessages()
160 if err != nil {
161 return fmt.Errorf("get unread messages: %w", err)
162 }
163
164 // TODO: handle the cursor pagination
165
166 for _, convo := range convoResp.Convos {
167 if convo.UnreadCount == 0 {
168 continue
169 }
170
171 messageResp, err := d.GetMessages(ctx, convo.ID)
172 if err != nil {
173 slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID)
174 continue
175 }
176
177 unreadCount := convo.UnreadCount
178 unreadMessages := make([]Message, 0, convo.UnreadCount)
179 // TODO: handle cursor pagination
180 for _, msg := range messageResp.Messages {
181 // TODO: techincally if I get to a message that's from the bot account, then there shouldn't be
182 // an more unread messages?
183 if msg.Sender.Did == d.auth.Did {
184 continue
185 }
186
187 unreadMessages = append(unreadMessages, msg)
188 unreadCount--
189 if unreadCount == 0 {
190 break
191 }
192 }
193
194 for _, msg := range unreadMessages {
195 d.handleMessage(msg)
196
197 err = d.MarkMessageRead(msg.ID, convo.ID)
198 if err != nil {
199 slog.Error("marking message read", "error", err)
200 continue
201 }
202 }
203 }
204
205 return nil
206}
207
208func (d *DmService) handleMessage(msg Message) {
209 // for now, ignore messages that don't have linked posts in them
210 if msg.Embed.Record.URI == "" {
211 return
212 }
213
214 rkey := getRKeyFromATURI(msg.Embed.Record.URI)
215 msgAction := strings.ToLower(msg.Text)
216
217 var err error
218
219 switch {
220 case strings.Contains(msgAction, "delete"):
221 err = d.handleDeleteBookmark(msg)
222 default:
223 err = d.handleCreateBookmark(msg)
224 }
225
226 if err != nil {
227 // TODO: perhaps continue here so that we don't mark the message as read so it can be tried again? Or perhaps send a message
228 // too the user?
229 slog.Error("failed to handle bookmark message", "error", err, "rkey", rkey, "sender", msg.Sender.Did)
230 }
231}
232
233func (d *DmService) handleCreateBookmark(msg Message) error {
234 content := msg.Embed.Record.Value.Text
235 if content == "" {
236 content = "post contained no text"
237 }
238 if len(content) > 75 {
239 content = fmt.Sprintf("%s...", content[:75])
240 }
241
242 publicURI := getPublicPostURIFromATURI(msg.Embed.Record.URI, msg.Embed.Record.Author.Handle)
243
244 rkey := getRKeyFromATURI(msg.Embed.Record.URI)
245
246 err := d.bookmarkStore.CreateBookmark(rkey, publicURI, msg.Embed.Record.URI, msg.Embed.Record.Author.Did, msg.Embed.Record.Author.Handle, msg.Sender.Did, content, time.Now().UnixMilli())
247 if err != nil {
248 return fmt.Errorf("creating bookmark: %w", err)
249 }
250 return nil
251}
252
253func (d *DmService) handleDeleteBookmark(msg Message) error {
254 rkey := getRKeyFromATURI(msg.Embed.Record.URI)
255
256 err := d.bookmarkStore.DeleteRepliedPostsForBookmarkedPostURIandUserDID(msg.Embed.Record.URI, msg.Sender.Did)
257 if err != nil {
258 return fmt.Errorf("failed to delete replied posts for bookmark for user: %w", err)
259 }
260
261 err = d.bookmarkStore.DeleteBookmark(rkey, msg.Sender.Did)
262 if err != nil {
263 return fmt.Errorf("failed to delete bookmark: %w", err)
264 }
265
266 return nil
267}
268
269func getPublicPostURIFromATURI(atURI, authorHandle string) string {
270 atSplit := strings.Split(atURI, "app.bsky.feed.post")
271 if len(atSplit) < 2 {
272 slog.Error("can't get public post URI from AT uri", "at uri", atURI)
273 return ""
274 }
275 return fmt.Sprintf("https://bsky.app/profile/%s/post%s", authorHandle, atSplit[1])
276}
277
278func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) {
279 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL)
280 request, err := http.NewRequest("GET", url, nil)
281 if err != nil {
282 return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err)
283 }
284
285 request.Header.Add("Content-Type", "application/json")
286 request.Header.Add("Accept", "application/json")
287 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
288 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
289
290 resp, err := d.httpClient.Do(request)
291 if err != nil {
292 return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err)
293 }
294 defer resp.Body.Close()
295
296 if resp.StatusCode != http.StatusOK {
297 var errorResp ErrorResponse
298 err = decodeResp(resp.Body, &errorResp)
299 if err != nil {
300 return ListConvosResponse{}, err
301 }
302
303 return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
304 }
305
306 var listConvoResp ListConvosResponse
307 err = decodeResp(resp.Body, &listConvoResp)
308 if err != nil {
309 return ListConvosResponse{}, err
310 }
311
312 return listConvoResp, nil
313}
314
315func (d *DmService) MarkMessageRead(messageID, convoID string) error {
316 bodyReq := UpdateMessageReadRequest{
317 ConvoID: convoID,
318 MessageID: messageID,
319 }
320
321 bodyB, err := json.Marshal(bodyReq)
322 if err != nil {
323 return fmt.Errorf("marshal update message request body: %w", err)
324 }
325
326 r := bytes.NewReader(bodyB)
327
328 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL)
329 request, err := http.NewRequest("POST", url, r)
330 if err != nil {
331 return fmt.Errorf("create new list convos http request: %w", err)
332 }
333
334 request.Header.Add("Content-Type", "application/json")
335 request.Header.Add("Accept", "application/json")
336 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
337 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
338
339 resp, err := d.httpClient.Do(request)
340 if err != nil {
341 return fmt.Errorf("do http request to update message read: %w", err)
342 }
343 defer resp.Body.Close()
344
345 if resp.StatusCode == http.StatusOK {
346 return nil
347 }
348
349 var errorResp ErrorResponse
350 err = decodeResp(resp.Body, &errorResp)
351 if err != nil {
352 return err
353 }
354
355 return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
356
357}
358
359func (d *DmService) Authenicate() (auth, error) {
360 url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL)
361
362 requestData := map[string]interface{}{
363 "identifier": d.accessData.handle,
364 "password": d.accessData.appPassword,
365 }
366
367 data, err := json.Marshal(requestData)
368 if err != nil {
369 return auth{}, errors.Wrap(err, "failed to marshal request")
370 }
371
372 r := bytes.NewReader(data)
373
374 request, err := http.NewRequest("POST", url, r)
375 if err != nil {
376 return auth{}, errors.Wrap(err, "failed to create request")
377 }
378
379 request.Header.Add("Content-Type", "application/json")
380
381 resp, err := d.httpClient.Do(request)
382 if err != nil {
383 return auth{}, errors.Wrap(err, "failed to make request")
384 }
385 defer resp.Body.Close()
386
387 var loginResp auth
388 err = decodeResp(resp.Body, &loginResp)
389 if err != nil {
390 return auth{}, err
391 }
392
393 return loginResp, nil
394}
395
396func (d *DmService) RefreshTask(ctx context.Context) {
397 timer := time.NewTimer(time.Hour)
398 defer timer.Stop()
399
400 for {
401 select {
402 case <-ctx.Done():
403 return
404 case <-timer.C:
405 err := d.RefreshAuthenication(ctx)
406 if err != nil {
407 slog.Error("handle refresh auth timer", "error", err)
408 // TODO: better retry with backoff probably
409 timer.Reset(time.Minute)
410 continue
411 }
412 timer.Reset(time.Hour)
413 }
414 }
415}
416
417func (d *DmService) RefreshAuthenication(ctx context.Context) error {
418 url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL)
419
420 request, err := http.NewRequest("POST", url, nil)
421 if err != nil {
422 return errors.Wrap(err, "failed to create request")
423 }
424
425 request.Header.Add("Content-Type", "application/json")
426 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT))
427
428 resp, err := d.httpClient.Do(request)
429 if err != nil {
430 return errors.Wrap(err, "failed to make request")
431 }
432 defer resp.Body.Close()
433
434 var loginResp auth
435 err = decodeResp(resp.Body, &loginResp)
436 if err != nil {
437 return err
438 }
439
440 d.auth = loginResp
441
442 return nil
443}
444
445func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) {
446 url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID)
447 request, err := http.NewRequest("GET", url, nil)
448 if err != nil {
449 return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err)
450 }
451
452 request.Header.Add("Content-Type", "application/json")
453 request.Header.Add("Accept", "application/json")
454 request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
455 request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
456
457 resp, err := d.httpClient.Do(request)
458 if err != nil {
459 return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err)
460 }
461 defer resp.Body.Close()
462
463 if resp.StatusCode != http.StatusOK {
464 var errorResp ErrorResponse
465 err = decodeResp(resp.Body, &errorResp)
466 if err != nil {
467 return MessageResp{}, err
468 }
469
470 return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
471 }
472
473 var messageResp MessageResp
474 err = decodeResp(resp.Body, &messageResp)
475 if err != nil {
476 return MessageResp{}, err
477 }
478
479 return messageResp, nil
480}
481
482func decodeResp(body io.Reader, result any) error {
483 resBody, err := io.ReadAll(body)
484 if err != nil {
485 return errors.Wrap(err, "failed to read response")
486 }
487
488 err = json.Unmarshal(resBody, result)
489 if err != nil {
490 return errors.Wrap(err, "failed to unmarshal response")
491 }
492 return nil
493}