+28
-2
cmd/main.go
+28
-2
cmd/main.go
···
12
12
"os/signal"
13
13
"path"
14
14
"syscall"
15
+
"time"
15
16
16
17
tangledalertbot "tangled.sh/willdot.net/tangled-alert-bot"
17
18
···
56
57
}
57
58
defer database.Close()
58
59
60
+
dmService, err := tangledalertbot.NewDmService(database, time.Second*30)
61
+
if err != nil {
62
+
return fmt.Errorf("create dm service: %w", err)
63
+
}
64
+
59
65
ctx, cancel := context.WithCancel(context.Background())
60
66
defer cancel()
61
67
62
68
go consumeLoop(ctx, database)
63
69
64
70
go startHttpServer(ctx, database)
71
+
72
+
go dmService.Start(ctx)
65
73
66
74
<-signals
67
75
cancel()
···
86
94
if errors.Is(err, context.Canceled) {
87
95
return nil
88
96
}
89
-
slog.Error("consume loop", "error", err)
90
-
bugsnag.Notify(err)
91
97
return err
92
98
}
93
99
return nil
···
103
109
mux := http.NewServeMux()
104
110
mux.HandleFunc("/issues", srv.handleListIssues)
105
111
mux.HandleFunc("/comments", srv.handleListComments)
112
+
mux.HandleFunc("/users", srv.handleListUsers)
106
113
107
114
err := http.ListenAndServe(":3000", mux)
108
115
if err != nil {
···
151
158
w.Header().Set("Content-Type", "application/json")
152
159
w.Write(b)
153
160
}
161
+
162
+
func (s *server) handleListUsers(w http.ResponseWriter, r *http.Request) {
163
+
users, err := s.db.GetUsers()
164
+
if err != nil {
165
+
slog.Error("getting users from DB", "error", err)
166
+
http.Error(w, "error getting users from DB", http.StatusInternalServerError)
167
+
return
168
+
}
169
+
170
+
b, err := json.Marshal(users)
171
+
if err != nil {
172
+
slog.Error("marshalling users from DB", "error", err)
173
+
http.Error(w, "marshalling users from DB", http.StatusInternalServerError)
174
+
return
175
+
}
176
+
177
+
w.Header().Set("Content-Type", "application/json")
178
+
w.Write(b)
179
+
}
+128
-20
consumer.go
+128
-20
consumer.go
···
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+
"strings"
6
7
7
8
"fmt"
8
9
"log/slog"
···
12
13
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
13
14
"github.com/bluesky-social/jetstream/pkg/models"
14
15
"github.com/bugsnag/bugsnag-go"
15
-
"tangled.sh/tangled.sh/core/api/tangled"
16
+
"tangled.org/core/api/tangled"
16
17
)
17
18
18
19
type Issue struct {
···
29
30
RKey string `json:"rkey"`
30
31
Body string `json:"body"`
31
32
Issue string `json:"issue" `
32
-
ReplyTo string `json:"replyTo"`
33
33
CreatedAt int64 `json:"createdAt"`
34
34
}
35
35
36
36
type Store interface {
37
37
CreateIssue(issue Issue) error
38
38
CreateComment(comment Comment) error
39
+
DeleteIssue(did, rkey string) error
40
+
DeleteComment(did, rkey string) error
41
+
DeleteCommentsForIssue(issueURI string) error
42
+
GetUser(did string) (User, error)
43
+
CreateUser(user User) error
39
44
}
40
45
41
46
// JetstreamConsumer is responsible for consuming from a jetstream instance
···
102
107
103
108
switch event.Commit.Operation {
104
109
case models.CommitOperationCreate, models.CommitOperationUpdate:
105
-
return h.handleCreateEvent(ctx, event)
106
-
// TODO: handle deletes too
110
+
return h.handleCreateUpdateEvent(ctx, event)
111
+
case models.CommitOperationDelete:
112
+
return h.handleDeleteEvent(ctx, event)
113
+
default:
114
+
return nil
115
+
}
116
+
}
117
+
118
+
func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error {
119
+
switch event.Commit.Collection {
120
+
case tangled.RepoIssueNSID:
121
+
h.handleCreateUpdateIssueEvent(ctx, event)
122
+
case tangled.RepoIssueCommentNSID:
123
+
h.handleCreateUpdateIssueCommentEvent(ctx, event)
107
124
default:
125
+
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
108
126
return nil
109
127
}
128
+
129
+
return nil
110
130
}
111
131
112
-
func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error {
132
+
func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error {
113
133
switch event.Commit.Collection {
114
134
case tangled.RepoIssueNSID:
115
-
h.handleIssueEvent(ctx, event)
135
+
h.handleDeleteIssueEvent(ctx, event)
116
136
case tangled.RepoIssueCommentNSID:
117
-
h.handleIssueCommentEvent(ctx, event)
137
+
h.handleDeleteIssueCommentEvent(ctx, event)
118
138
default:
119
139
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
120
140
return nil
···
123
143
return nil
124
144
}
125
145
126
-
func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) {
146
+
func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) {
127
147
var issue tangled.RepoIssue
128
148
129
149
err := json.Unmarshal(event.Commit.Record, &issue)
···
162
182
slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
163
183
}
164
184
165
-
func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) {
185
+
func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) {
166
186
var comment tangled.RepoIssueComment
167
187
168
188
err := json.Unmarshal(event.Commit.Record, &comment)
···
181
201
slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
182
202
createdAt = time.Now().UTC()
183
203
}
184
-
err = h.store.CreateComment(Comment{
185
-
AuthorDID: did,
186
-
RKey: rkey,
187
-
Body: comment.Body,
188
-
Issue: comment.Issue,
189
-
CreatedAt: createdAt.UnixMilli(),
190
-
//ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported
191
-
})
204
+
205
+
// if there is a replyTo present, don't store the comment because replies can't be replied to so
206
+
// the reply comment doesn't need to be stored
207
+
if comment.ReplyTo == nil || *comment.ReplyTo == "" {
208
+
err = h.store.CreateComment(Comment{
209
+
AuthorDID: did,
210
+
RKey: rkey,
211
+
Body: comment.Body,
212
+
Issue: comment.Issue,
213
+
CreatedAt: createdAt.UnixMilli(),
214
+
})
215
+
if err != nil {
216
+
bugsnag.Notify(err)
217
+
slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
218
+
return
219
+
}
220
+
}
221
+
222
+
// TODO: now send a notification to either the issue creator or whoever the comment was a reply to
223
+
didToNotify := getUserToAlert(comment)
224
+
if didToNotify == "" {
225
+
slog.Info("could not work out did to send alert to", "comment", comment)
226
+
return
227
+
}
228
+
229
+
user, err := h.store.GetUser(didToNotify)
230
+
if err != nil {
231
+
slog.Error("getting user to send alert to", "error", err, "did", didToNotify)
232
+
return
233
+
}
234
+
235
+
slog.Info("sending alert to user", "value", comment, "did", didToNotify, "convo", user.ConvoID)
236
+
}
237
+
238
+
func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) {
239
+
did := event.Did
240
+
rkey := event.Commit.RKey
241
+
242
+
err := h.store.DeleteIssue(did, rkey)
192
243
if err != nil {
193
244
bugsnag.Notify(err)
194
-
slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
245
+
slog.Error("delete issue", "error", err, "did", did, "rkey", rkey)
195
246
return
196
247
}
197
248
198
-
// TODO: now send a notification to either the issue creator or whoever the comment was a reply to
249
+
// now attempt to delete any comments on that issue since they can't be replied to now.
250
+
// Note: if unsuccessful it doesn't matter because a deleted issue and its comments are
251
+
// not visible on the UI and so no one will be able to reply to them so this is just a
252
+
// cleanup operation
253
+
issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey)
254
+
err = h.store.DeleteCommentsForIssue(issueURI)
255
+
if err != nil {
256
+
bugsnag.Notify(err)
257
+
slog.Error("delete comments for issue", "error", err, "issue URI", issueURI)
258
+
}
259
+
260
+
slog.Info("deleted issue ", "did", did, "rkey", rkey)
261
+
}
199
262
200
-
slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
263
+
func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) {
264
+
did := event.Did
265
+
rkey := event.Commit.RKey
266
+
267
+
err := h.store.DeleteComment(did, rkey)
268
+
if err != nil {
269
+
bugsnag.Notify(err)
270
+
slog.Error("delete comment", "error", err, "did", did, "rkey", rkey)
271
+
return
272
+
}
273
+
274
+
slog.Info("deleted comment ", "did", did, "rkey", rkey)
275
+
}
276
+
277
+
// at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.issue.comment/3lzkp4va62m22
278
+
func getUserToAlert(comment tangled.RepoIssueComment) string {
279
+
if comment.ReplyTo != nil {
280
+
return getDidFromCommentURI(*comment.ReplyTo)
281
+
}
282
+
return getDidFromIssueURI(comment.Issue)
283
+
}
284
+
285
+
func getDidFromCommentURI(uri string) string {
286
+
split := strings.Split(uri, tangled.RepoIssueCommentNSID)
287
+
if len(split) != 2 {
288
+
slog.Error("invalid comment URI received", "uri", uri)
289
+
return ""
290
+
}
291
+
292
+
did := strings.TrimPrefix(split[0], "at://")
293
+
did = strings.TrimSuffix(did, "/")
294
+
295
+
return did
296
+
}
297
+
298
+
func getDidFromIssueURI(uri string) string {
299
+
split := strings.Split(uri, tangled.RepoIssueNSID)
300
+
if len(split) != 2 {
301
+
slog.Error("invalid issue URI received", "uri", uri)
302
+
return ""
303
+
}
304
+
305
+
did := strings.TrimPrefix(split[0], "at://")
306
+
did = strings.TrimSuffix(did, "/")
307
+
308
+
return did
201
309
}
+117
-5
database.go
+117
-5
database.go
···
44
44
return nil, fmt.Errorf("creating comments table: %w", err)
45
45
}
46
46
47
+
err = createUsersTable(db)
48
+
if err != nil {
49
+
return nil, fmt.Errorf("creating users table: %w", err)
50
+
}
51
+
47
52
return &Database{db: db}, nil
48
53
}
49
54
···
104
109
"rkey" TEXT,
105
110
"body" TEXT,
106
111
"issue" TEXT,
107
-
"replyTo" TEXT,
108
112
"createdAt" integer NOT NULL,
109
113
UNIQUE(authorDid,rkey)
110
114
);`
···
123
127
return nil
124
128
}
125
129
130
+
func createUsersTable(db *sql.DB) error {
131
+
createTableSQL := `CREATE TABLE IF NOT EXISTS users (
132
+
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
133
+
"did" TEXT,
134
+
"handle" TEXT,
135
+
"convoId" TEXT,
136
+
"createdAt" integer NOT NULL,
137
+
UNIQUE(did)
138
+
);`
139
+
140
+
slog.Info("Create users table...")
141
+
statement, err := db.Prepare(createTableSQL)
142
+
if err != nil {
143
+
return fmt.Errorf("prepare DB statement to create users table: %w", err)
144
+
}
145
+
_, err = statement.Exec()
146
+
if err != nil {
147
+
return fmt.Errorf("exec sql statement to create users table: %w", err)
148
+
}
149
+
slog.Info("users table created")
150
+
151
+
return nil
152
+
}
153
+
126
154
// CreateIssue will insert a issue into a database
127
155
func (d *Database) CreateIssue(issue Issue) error {
128
156
sql := `REPLACE INTO issues (authorDid, rkey, title, body, repo, createdAt) VALUES (?, ?, ?, ?, ?, ?);`
···
135
163
136
164
// CreateComment will insert a comment into a database
137
165
func (d *Database) CreateComment(comment Comment) error {
138
-
sql := `REPLACE INTO comments (authorDid, rkey, body, issue, replyTo, createdAt) VALUES (?, ?, ?, ?, ?, ?);`
139
-
_, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.ReplyTo, comment.CreatedAt)
166
+
sql := `REPLACE INTO comments (authorDid, rkey, body, issue, createdAt) VALUES (?, ?, ?, ?, ?);`
167
+
_, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.CreatedAt)
140
168
if err != nil {
141
169
return fmt.Errorf("exec insert comment: %w", err)
170
+
}
171
+
return nil
172
+
}
173
+
174
+
// CreateUser will insert a user into a database
175
+
func (d *Database) CreateUser(user User) error {
176
+
sql := `REPLACE INTO users (did, handle, convoId, createdAt) VALUES (?, ?, ?, ?);`
177
+
_, err := d.db.Exec(sql, user.DID, user.Handle, user.ConvoID, user.CreatedAt)
178
+
if err != nil {
179
+
return fmt.Errorf("exec insert user: %w", err)
142
180
}
143
181
return nil
144
182
}
···
164
202
}
165
203
166
204
func (d *Database) GetComments() ([]Comment, error) {
167
-
sql := "SELECT authorDid, rkey, body, issue, replyTo, createdAt FROM comments;"
205
+
sql := "SELECT authorDid, rkey, body, issue, createdAt FROM comments;"
168
206
rows, err := d.db.Query(sql)
169
207
if err != nil {
170
208
return nil, fmt.Errorf("run query to get comments: %w", err)
···
174
212
var results []Comment
175
213
for rows.Next() {
176
214
var comment Comment
177
-
if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.ReplyTo, &comment.CreatedAt); err != nil {
215
+
if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.CreatedAt); err != nil {
178
216
return nil, fmt.Errorf("scan row: %w", err)
179
217
}
180
218
···
182
220
}
183
221
return results, nil
184
222
}
223
+
224
+
func (d *Database) GetUser(did string) (User, error) {
225
+
sql := "SELECT did, handle, convoId, createdAt FROM users WHERE did = ?;"
226
+
rows, err := d.db.Query(sql, did)
227
+
if err != nil {
228
+
return User{}, fmt.Errorf("run query to get user: %w", err)
229
+
}
230
+
defer rows.Close()
231
+
232
+
for rows.Next() {
233
+
var user User
234
+
if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil {
235
+
return User{}, fmt.Errorf("scan row: %w", err)
236
+
}
237
+
238
+
return user, nil
239
+
}
240
+
return User{}, fmt.Errorf("user not found")
241
+
}
242
+
243
+
func (d *Database) GetUsers() ([]User, error) {
244
+
sql := "SELECT did, handle, convoId, createdAt FROM users;"
245
+
rows, err := d.db.Query(sql)
246
+
if err != nil {
247
+
return nil, fmt.Errorf("run query to get user: %w", err)
248
+
}
249
+
defer rows.Close()
250
+
251
+
var results []User
252
+
for rows.Next() {
253
+
var user User
254
+
if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil {
255
+
return nil, fmt.Errorf("scan row: %w", err)
256
+
}
257
+
results = append(results, user)
258
+
}
259
+
return results, nil
260
+
}
261
+
262
+
func (d *Database) DeleteIssue(did, rkey string) error {
263
+
sql := "DELETE FROM issues WHERE authorDid = ? AND rkey = ?;"
264
+
_, err := d.db.Exec(sql, did, rkey)
265
+
if err != nil {
266
+
return fmt.Errorf("exec delete issue: %w", err)
267
+
}
268
+
return nil
269
+
}
270
+
271
+
func (d *Database) DeleteComment(did, rkey string) error {
272
+
sql := "DELETE FROM comments WHERE authorDid = ? AND rkey = ?;"
273
+
_, err := d.db.Exec(sql, did, rkey)
274
+
if err != nil {
275
+
return fmt.Errorf("exec delete issue: %w", err)
276
+
}
277
+
return nil
278
+
}
279
+
280
+
func (d *Database) DeleteCommentsForIssue(issueURI string) error {
281
+
sql := "DELETE FROM comments WHERE issue = ?;"
282
+
_, err := d.db.Exec(sql, issueURI)
283
+
if err != nil {
284
+
return fmt.Errorf("exec delete comments for issue")
285
+
}
286
+
return nil
287
+
}
288
+
289
+
func (d *Database) DeleteUser(did string) error {
290
+
sql := "DELETE FROM users WHERE did = ?;"
291
+
_, err := d.db.Exec(sql, did)
292
+
if err != nil {
293
+
return fmt.Errorf("exec delete user")
294
+
}
295
+
return nil
296
+
}
+441
dm_handler.go
+441
dm_handler.go
···
1
+
package tangledalertbot
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"encoding/json"
7
+
"fmt"
8
+
"io"
9
+
"log/slog"
10
+
"net/http"
11
+
"os"
12
+
"strings"
13
+
"time"
14
+
15
+
"github.com/pkg/errors"
16
+
)
17
+
18
+
const (
19
+
httpClientTimeoutDuration = time.Second * 5
20
+
transportIdleConnTimeoutDuration = time.Second * 90
21
+
baseBskyURL = "https://bsky.social/xrpc"
22
+
)
23
+
24
+
type auth struct {
25
+
AccessJwt string `json:"accessJwt"`
26
+
RefershJWT string `json:"refreshJwt"`
27
+
Did string `json:"did"`
28
+
}
29
+
30
+
type accessData struct {
31
+
handle string
32
+
appPassword string
33
+
}
34
+
35
+
type ListConvosResponse struct {
36
+
Cursor string `json:"cursor"`
37
+
Convos []Convo `json:"convos"`
38
+
}
39
+
40
+
type Convo struct {
41
+
ID string `json:"id"`
42
+
Members []ConvoMember `json:"members"`
43
+
UnreadCount int `json:"unreadCount"`
44
+
}
45
+
46
+
type ConvoMember struct {
47
+
Did string `json:"did"`
48
+
Handle string `json:"handle"`
49
+
}
50
+
51
+
type ErrorResponse struct {
52
+
Error string `json:"error"`
53
+
}
54
+
55
+
type MessageResp struct {
56
+
Messages []Message `json:"messages"`
57
+
Cursor string `json:"cursor"`
58
+
}
59
+
60
+
type Message struct {
61
+
ID string `json:"id"`
62
+
Sender MessageSender `json:"sender"`
63
+
Text string `json:"text"`
64
+
}
65
+
66
+
type MessageSender struct {
67
+
Did string `json:"did"`
68
+
}
69
+
70
+
type UpdateMessageReadRequest struct {
71
+
ConvoID string `json:"convoId"`
72
+
MessageID string `json:"messageId"`
73
+
}
74
+
75
+
type User struct {
76
+
DID string
77
+
Handle string
78
+
ConvoID string
79
+
CreatedAt int
80
+
}
81
+
82
+
type DmService struct {
83
+
httpClient *http.Client
84
+
accessData accessData
85
+
auth auth
86
+
timerDuration time.Duration
87
+
pdsURL string
88
+
store Store
89
+
}
90
+
91
+
func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) {
92
+
httpClient := http.Client{
93
+
Timeout: httpClientTimeoutDuration,
94
+
Transport: &http.Transport{
95
+
IdleConnTimeout: transportIdleConnTimeoutDuration,
96
+
},
97
+
}
98
+
99
+
accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE")
100
+
accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD")
101
+
pdsURL := os.Getenv("MESSAGING_PDS_URL")
102
+
103
+
service := DmService{
104
+
httpClient: &httpClient,
105
+
accessData: accessData{
106
+
handle: accessHandle,
107
+
appPassword: accessAppPassword,
108
+
},
109
+
timerDuration: timerDuration,
110
+
pdsURL: pdsURL,
111
+
store: store,
112
+
}
113
+
114
+
auth, err := service.Authenicate()
115
+
if err != nil {
116
+
return nil, fmt.Errorf("authenticating: %w", err)
117
+
}
118
+
119
+
service.auth = auth
120
+
121
+
return &service, nil
122
+
}
123
+
124
+
func (d *DmService) Start(ctx context.Context) {
125
+
go d.RefreshTask(ctx)
126
+
127
+
timer := time.NewTimer(d.timerDuration)
128
+
defer timer.Stop()
129
+
130
+
for {
131
+
select {
132
+
case <-ctx.Done():
133
+
slog.Warn("context canceled - stopping dm task")
134
+
return
135
+
case <-timer.C:
136
+
err := d.HandleMessageTimer(ctx)
137
+
if err != nil {
138
+
slog.Error("handle message timer", "error", err)
139
+
}
140
+
timer.Reset(d.timerDuration)
141
+
}
142
+
}
143
+
}
144
+
145
+
func (d *DmService) RefreshTask(ctx context.Context) {
146
+
timer := time.NewTimer(time.Hour)
147
+
defer timer.Stop()
148
+
149
+
for {
150
+
select {
151
+
case <-ctx.Done():
152
+
return
153
+
case <-timer.C:
154
+
err := d.RefreshAuthenication(ctx)
155
+
if err != nil {
156
+
slog.Error("handle refresh auth timer", "error", err)
157
+
// TODO: better retry with backoff probably
158
+
timer.Reset(time.Minute)
159
+
continue
160
+
}
161
+
timer.Reset(time.Hour)
162
+
}
163
+
}
164
+
}
165
+
166
+
func (d *DmService) HandleMessageTimer(ctx context.Context) error {
167
+
convoResp, err := d.GetUnreadMessages()
168
+
if err != nil {
169
+
return fmt.Errorf("get unread messages: %w", err)
170
+
}
171
+
172
+
// TODO: handle the cursor pagination
173
+
174
+
for _, convo := range convoResp.Convos {
175
+
if convo.UnreadCount == 0 {
176
+
continue
177
+
}
178
+
179
+
messageResp, err := d.GetMessages(ctx, convo.ID)
180
+
if err != nil {
181
+
slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID)
182
+
continue
183
+
}
184
+
185
+
unreadCount := convo.UnreadCount
186
+
unreadMessages := make([]Message, 0, convo.UnreadCount)
187
+
// TODO: handle cursor pagination
188
+
for _, msg := range messageResp.Messages {
189
+
// TODO: techincally if I get to a message that's from the bot account, then there shouldn't be
190
+
// an more unread messages?
191
+
if msg.Sender.Did == d.auth.Did {
192
+
continue
193
+
}
194
+
195
+
unreadMessages = append(unreadMessages, msg)
196
+
unreadCount--
197
+
if unreadCount == 0 {
198
+
break
199
+
}
200
+
}
201
+
202
+
for _, msg := range unreadMessages {
203
+
d.handleMessage(msg, convo)
204
+
205
+
err = d.MarkMessageRead(msg.ID, convo.ID)
206
+
if err != nil {
207
+
slog.Error("marking message read", "error", err)
208
+
continue
209
+
}
210
+
}
211
+
}
212
+
213
+
return nil
214
+
}
215
+
216
+
func (d *DmService) handleMessage(msg Message, convo Convo) {
217
+
// TODO: add or remote user the list of "subsribed" users
218
+
if strings.ToLower(msg.Text) == "subscribe" {
219
+
userHandle := ""
220
+
for _, member := range convo.Members {
221
+
if member.Did == msg.Sender.Did {
222
+
userHandle = member.Handle
223
+
break
224
+
}
225
+
}
226
+
227
+
if userHandle == "" {
228
+
slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members)
229
+
return
230
+
}
231
+
232
+
user := User{
233
+
DID: msg.Sender.Did,
234
+
ConvoID: convo.ID,
235
+
Handle: userHandle,
236
+
CreatedAt: int(time.Now().UnixMilli()),
237
+
}
238
+
239
+
err := d.store.CreateUser(user)
240
+
if err != nil {
241
+
slog.Error("error creating user", "error", err, "user", user)
242
+
return
243
+
}
244
+
}
245
+
}
246
+
247
+
func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) {
248
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL)
249
+
request, err := http.NewRequest("GET", url, nil)
250
+
if err != nil {
251
+
return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err)
252
+
}
253
+
254
+
request.Header.Add("Content-Type", "application/json")
255
+
request.Header.Add("Accept", "application/json")
256
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
257
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
258
+
259
+
resp, err := d.httpClient.Do(request)
260
+
if err != nil {
261
+
return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err)
262
+
}
263
+
defer resp.Body.Close()
264
+
265
+
if resp.StatusCode != http.StatusOK {
266
+
var errorResp ErrorResponse
267
+
err = decodeResp(resp.Body, &errorResp)
268
+
if err != nil {
269
+
return ListConvosResponse{}, err
270
+
}
271
+
272
+
return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
273
+
}
274
+
275
+
var listConvoResp ListConvosResponse
276
+
err = decodeResp(resp.Body, &listConvoResp)
277
+
if err != nil {
278
+
return ListConvosResponse{}, err
279
+
}
280
+
281
+
return listConvoResp, nil
282
+
}
283
+
284
+
func (d *DmService) MarkMessageRead(messageID, convoID string) error {
285
+
bodyReq := UpdateMessageReadRequest{
286
+
ConvoID: convoID,
287
+
MessageID: messageID,
288
+
}
289
+
290
+
bodyB, err := json.Marshal(bodyReq)
291
+
if err != nil {
292
+
return fmt.Errorf("marshal update message request body: %w", err)
293
+
}
294
+
295
+
r := bytes.NewReader(bodyB)
296
+
297
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL)
298
+
request, err := http.NewRequest("POST", url, r)
299
+
if err != nil {
300
+
return fmt.Errorf("create new list convos http request: %w", err)
301
+
}
302
+
303
+
request.Header.Add("Content-Type", "application/json")
304
+
request.Header.Add("Accept", "application/json")
305
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
306
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
307
+
308
+
resp, err := d.httpClient.Do(request)
309
+
if err != nil {
310
+
return fmt.Errorf("do http request to update message read: %w", err)
311
+
}
312
+
defer resp.Body.Close()
313
+
314
+
if resp.StatusCode == http.StatusOK {
315
+
return nil
316
+
}
317
+
318
+
var errorResp ErrorResponse
319
+
err = decodeResp(resp.Body, &errorResp)
320
+
if err != nil {
321
+
return err
322
+
}
323
+
324
+
return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
325
+
326
+
}
327
+
328
+
func (d *DmService) Authenicate() (auth, error) {
329
+
url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL)
330
+
331
+
requestData := map[string]interface{}{
332
+
"identifier": d.accessData.handle,
333
+
"password": d.accessData.appPassword,
334
+
}
335
+
336
+
data, err := json.Marshal(requestData)
337
+
if err != nil {
338
+
return auth{}, errors.Wrap(err, "failed to marshal request")
339
+
}
340
+
341
+
r := bytes.NewReader(data)
342
+
343
+
request, err := http.NewRequest("POST", url, r)
344
+
if err != nil {
345
+
return auth{}, errors.Wrap(err, "failed to create request")
346
+
}
347
+
348
+
request.Header.Add("Content-Type", "application/json")
349
+
350
+
resp, err := d.httpClient.Do(request)
351
+
if err != nil {
352
+
return auth{}, errors.Wrap(err, "failed to make request")
353
+
}
354
+
defer resp.Body.Close()
355
+
356
+
var loginResp auth
357
+
err = decodeResp(resp.Body, &loginResp)
358
+
if err != nil {
359
+
return auth{}, err
360
+
}
361
+
362
+
return loginResp, nil
363
+
}
364
+
365
+
func (d *DmService) RefreshAuthenication(ctx context.Context) error {
366
+
url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL)
367
+
368
+
request, err := http.NewRequest("POST", url, nil)
369
+
if err != nil {
370
+
return errors.Wrap(err, "failed to create request")
371
+
}
372
+
373
+
request.Header.Add("Content-Type", "application/json")
374
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT))
375
+
376
+
resp, err := d.httpClient.Do(request)
377
+
if err != nil {
378
+
return errors.Wrap(err, "failed to make request")
379
+
}
380
+
defer resp.Body.Close()
381
+
382
+
var loginResp auth
383
+
err = decodeResp(resp.Body, &loginResp)
384
+
if err != nil {
385
+
return err
386
+
}
387
+
388
+
d.auth = loginResp
389
+
390
+
return nil
391
+
}
392
+
393
+
func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) {
394
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID)
395
+
request, err := http.NewRequest("GET", url, nil)
396
+
if err != nil {
397
+
return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err)
398
+
}
399
+
400
+
request.Header.Add("Content-Type", "application/json")
401
+
request.Header.Add("Accept", "application/json")
402
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
403
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
404
+
405
+
resp, err := d.httpClient.Do(request)
406
+
if err != nil {
407
+
return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err)
408
+
}
409
+
defer resp.Body.Close()
410
+
411
+
if resp.StatusCode != http.StatusOK {
412
+
var errorResp ErrorResponse
413
+
err = decodeResp(resp.Body, &errorResp)
414
+
if err != nil {
415
+
return MessageResp{}, err
416
+
}
417
+
418
+
return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
419
+
}
420
+
421
+
var messageResp MessageResp
422
+
err = decodeResp(resp.Body, &messageResp)
423
+
if err != nil {
424
+
return MessageResp{}, err
425
+
}
426
+
427
+
return messageResp, nil
428
+
}
429
+
430
+
func decodeResp(body io.Reader, result any) error {
431
+
resBody, err := io.ReadAll(body)
432
+
if err != nil {
433
+
return errors.Wrap(err, "failed to read response")
434
+
}
435
+
436
+
err = json.Unmarshal(resBody, result)
437
+
if err != nil {
438
+
return errors.Wrap(err, "failed to unmarshal response")
439
+
}
440
+
return nil
441
+
}
+1
-1
go.mod
+1
-1
go.mod
···
8
8
github.com/bugsnag/bugsnag-go v2.6.2+incompatible
9
9
github.com/glebarez/go-sqlite v1.22.0
10
10
github.com/joho/godotenv v1.5.1
11
-
tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98
11
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2
12
12
)
13
13
14
14
require (
+4
-2
go.sum
+4
-2
go.sum
···
103
103
modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E=
104
104
modernc.org/sqlite v1.28.0 h1:Zx+LyDDmXczNnEQdvPuEfcFVA2ZPyaD7UCZDjef3BHQ=
105
105
modernc.org/sqlite v1.28.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0=
106
-
tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98 h1:WovrwwBufU89zoSaStoc6+qyUTEB/LxhUCM1MqGEUwU=
107
-
tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98/go.mod h1:zXmPB9VMsPWpJ6Y51PWnzB1fL3w69P0IhR9rTXIfGPY=
106
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2 h1:4bcQewZPzb7WfCuUPf4MPVWb04JiTbjbShcg5ONi9co=
107
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2/go.mod h1:tYTB3RkgkeDAOFE0qX/9tQB80fdlDPR+vz4CdTMar3Y=
108
+
tangled.org/core v1.9.0-alpha.0.20250924200730-b2d8a54abc3d h1:DmdCyK+BZDYitJy6TdqTwvcci2EVYgDu2+LR853nyls=
109
+
tangled.org/core v1.9.0-alpha.0.20250924200730-b2d8a54abc3d/go.mod h1:tYTB3RkgkeDAOFE0qX/9tQB80fdlDPR+vz4CdTMar3Y=