this repo has no description

Compare changes

Choose any two refs to compare.

+28 -2
cmd/main.go
··· 12 12 "os/signal" 13 13 "path" 14 14 "syscall" 15 + "time" 15 16 16 17 tangledalertbot "tangled.sh/willdot.net/tangled-alert-bot" 17 18 ··· 56 57 } 57 58 defer database.Close() 58 59 60 + dmService, err := tangledalertbot.NewDmService(database, time.Second*30) 61 + if err != nil { 62 + return fmt.Errorf("create dm service: %w", err) 63 + } 64 + 59 65 ctx, cancel := context.WithCancel(context.Background()) 60 66 defer cancel() 61 67 62 68 go consumeLoop(ctx, database) 63 69 64 70 go startHttpServer(ctx, database) 71 + 72 + go dmService.Start(ctx) 65 73 66 74 <-signals 67 75 cancel() ··· 86 94 if errors.Is(err, context.Canceled) { 87 95 return nil 88 96 } 89 - slog.Error("consume loop", "error", err) 90 - bugsnag.Notify(err) 91 97 return err 92 98 } 93 99 return nil ··· 103 109 mux := http.NewServeMux() 104 110 mux.HandleFunc("/issues", srv.handleListIssues) 105 111 mux.HandleFunc("/comments", srv.handleListComments) 112 + mux.HandleFunc("/users", srv.handleListUsers) 106 113 107 114 err := http.ListenAndServe(":3000", mux) 108 115 if err != nil { ··· 151 158 w.Header().Set("Content-Type", "application/json") 152 159 w.Write(b) 153 160 } 161 + 162 + func (s *server) handleListUsers(w http.ResponseWriter, r *http.Request) { 163 + users, err := s.db.GetUsers() 164 + if err != nil { 165 + slog.Error("getting users from DB", "error", err) 166 + http.Error(w, "error getting users from DB", http.StatusInternalServerError) 167 + return 168 + } 169 + 170 + b, err := json.Marshal(users) 171 + if err != nil { 172 + slog.Error("marshalling users from DB", "error", err) 173 + http.Error(w, "marshalling users from DB", http.StatusInternalServerError) 174 + return 175 + } 176 + 177 + w.Header().Set("Content-Type", "application/json") 178 + w.Write(b) 179 + }
+128 -20
consumer.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "strings" 6 7 7 8 "fmt" 8 9 "log/slog" ··· 12 13 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 13 14 "github.com/bluesky-social/jetstream/pkg/models" 14 15 "github.com/bugsnag/bugsnag-go" 15 - "tangled.sh/tangled.sh/core/api/tangled" 16 + "tangled.org/core/api/tangled" 16 17 ) 17 18 18 19 type Issue struct { ··· 29 30 RKey string `json:"rkey"` 30 31 Body string `json:"body"` 31 32 Issue string `json:"issue" ` 32 - ReplyTo string `json:"replyTo"` 33 33 CreatedAt int64 `json:"createdAt"` 34 34 } 35 35 36 36 type Store interface { 37 37 CreateIssue(issue Issue) error 38 38 CreateComment(comment Comment) error 39 + DeleteIssue(did, rkey string) error 40 + DeleteComment(did, rkey string) error 41 + DeleteCommentsForIssue(issueURI string) error 42 + GetUser(did string) (User, error) 43 + CreateUser(user User) error 39 44 } 40 45 41 46 // JetstreamConsumer is responsible for consuming from a jetstream instance ··· 102 107 103 108 switch event.Commit.Operation { 104 109 case models.CommitOperationCreate, models.CommitOperationUpdate: 105 - return h.handleCreateEvent(ctx, event) 106 - // TODO: handle deletes too 110 + return h.handleCreateUpdateEvent(ctx, event) 111 + case models.CommitOperationDelete: 112 + return h.handleDeleteEvent(ctx, event) 113 + default: 114 + return nil 115 + } 116 + } 117 + 118 + func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error { 119 + switch event.Commit.Collection { 120 + case tangled.RepoIssueNSID: 121 + h.handleCreateUpdateIssueEvent(ctx, event) 122 + case tangled.RepoIssueCommentNSID: 123 + h.handleCreateUpdateIssueCommentEvent(ctx, event) 107 124 default: 125 + slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) 108 126 return nil 109 127 } 128 + 129 + return nil 110 130 } 111 131 112 - func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error { 132 + func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error { 113 133 switch event.Commit.Collection { 114 134 case tangled.RepoIssueNSID: 115 - h.handleIssueEvent(ctx, event) 135 + h.handleDeleteIssueEvent(ctx, event) 116 136 case tangled.RepoIssueCommentNSID: 117 - h.handleIssueCommentEvent(ctx, event) 137 + h.handleDeleteIssueCommentEvent(ctx, event) 118 138 default: 119 139 slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection) 120 140 return nil ··· 123 143 return nil 124 144 } 125 145 126 - func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) { 146 + func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) { 127 147 var issue tangled.RepoIssue 128 148 129 149 err := json.Unmarshal(event.Commit.Record, &issue) ··· 162 182 slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey) 163 183 } 164 184 165 - func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) { 185 + func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) { 166 186 var comment tangled.RepoIssueComment 167 187 168 188 err := json.Unmarshal(event.Commit.Record, &comment) ··· 181 201 slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt) 182 202 createdAt = time.Now().UTC() 183 203 } 184 - err = h.store.CreateComment(Comment{ 185 - AuthorDID: did, 186 - RKey: rkey, 187 - Body: comment.Body, 188 - Issue: comment.Issue, 189 - CreatedAt: createdAt.UnixMilli(), 190 - //ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported 191 - }) 204 + 205 + // if there is a replyTo present, don't store the comment because replies can't be replied to so 206 + // the reply comment doesn't need to be stored 207 + if comment.ReplyTo == nil || *comment.ReplyTo == "" { 208 + err = h.store.CreateComment(Comment{ 209 + AuthorDID: did, 210 + RKey: rkey, 211 + Body: comment.Body, 212 + Issue: comment.Issue, 213 + CreatedAt: createdAt.UnixMilli(), 214 + }) 215 + if err != nil { 216 + bugsnag.Notify(err) 217 + slog.Error("create comment", "error", err, "did", did, "rkey", rkey) 218 + return 219 + } 220 + } 221 + 222 + // TODO: now send a notification to either the issue creator or whoever the comment was a reply to 223 + didToNotify := getUserToAlert(comment) 224 + if didToNotify == "" { 225 + slog.Info("could not work out did to send alert to", "comment", comment) 226 + return 227 + } 228 + 229 + user, err := h.store.GetUser(didToNotify) 230 + if err != nil { 231 + slog.Error("getting user to send alert to", "error", err, "did", didToNotify) 232 + return 233 + } 234 + 235 + slog.Info("sending alert to user", "value", comment, "did", didToNotify, "convo", user.ConvoID) 236 + } 237 + 238 + func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) { 239 + did := event.Did 240 + rkey := event.Commit.RKey 241 + 242 + err := h.store.DeleteIssue(did, rkey) 192 243 if err != nil { 193 244 bugsnag.Notify(err) 194 - slog.Error("create comment", "error", err, "did", did, "rkey", rkey) 245 + slog.Error("delete issue", "error", err, "did", did, "rkey", rkey) 195 246 return 196 247 } 197 248 198 - // TODO: now send a notification to either the issue creator or whoever the comment was a reply to 249 + // now attempt to delete any comments on that issue since they can't be replied to now. 250 + // Note: if unsuccessful it doesn't matter because a deleted issue and its comments are 251 + // not visible on the UI and so no one will be able to reply to them so this is just a 252 + // cleanup operation 253 + issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey) 254 + err = h.store.DeleteCommentsForIssue(issueURI) 255 + if err != nil { 256 + bugsnag.Notify(err) 257 + slog.Error("delete comments for issue", "error", err, "issue URI", issueURI) 258 + } 259 + 260 + slog.Info("deleted issue ", "did", did, "rkey", rkey) 261 + } 199 262 200 - slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey) 263 + func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) { 264 + did := event.Did 265 + rkey := event.Commit.RKey 266 + 267 + err := h.store.DeleteComment(did, rkey) 268 + if err != nil { 269 + bugsnag.Notify(err) 270 + slog.Error("delete comment", "error", err, "did", did, "rkey", rkey) 271 + return 272 + } 273 + 274 + slog.Info("deleted comment ", "did", did, "rkey", rkey) 275 + } 276 + 277 + // at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.issue.comment/3lzkp4va62m22 278 + func getUserToAlert(comment tangled.RepoIssueComment) string { 279 + if comment.ReplyTo != nil { 280 + return getDidFromCommentURI(*comment.ReplyTo) 281 + } 282 + return getDidFromIssueURI(comment.Issue) 283 + } 284 + 285 + func getDidFromCommentURI(uri string) string { 286 + split := strings.Split(uri, tangled.RepoIssueCommentNSID) 287 + if len(split) != 2 { 288 + slog.Error("invalid comment URI received", "uri", uri) 289 + return "" 290 + } 291 + 292 + did := strings.TrimPrefix(split[0], "at://") 293 + did = strings.TrimSuffix(did, "/") 294 + 295 + return did 296 + } 297 + 298 + func getDidFromIssueURI(uri string) string { 299 + split := strings.Split(uri, tangled.RepoIssueNSID) 300 + if len(split) != 2 { 301 + slog.Error("invalid issue URI received", "uri", uri) 302 + return "" 303 + } 304 + 305 + did := strings.TrimPrefix(split[0], "at://") 306 + did = strings.TrimSuffix(did, "/") 307 + 308 + return did 201 309 }
+117 -5
database.go
··· 44 44 return nil, fmt.Errorf("creating comments table: %w", err) 45 45 } 46 46 47 + err = createUsersTable(db) 48 + if err != nil { 49 + return nil, fmt.Errorf("creating users table: %w", err) 50 + } 51 + 47 52 return &Database{db: db}, nil 48 53 } 49 54 ··· 104 109 "rkey" TEXT, 105 110 "body" TEXT, 106 111 "issue" TEXT, 107 - "replyTo" TEXT, 108 112 "createdAt" integer NOT NULL, 109 113 UNIQUE(authorDid,rkey) 110 114 );` ··· 123 127 return nil 124 128 } 125 129 130 + func createUsersTable(db *sql.DB) error { 131 + createTableSQL := `CREATE TABLE IF NOT EXISTS users ( 132 + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, 133 + "did" TEXT, 134 + "handle" TEXT, 135 + "convoId" TEXT, 136 + "createdAt" integer NOT NULL, 137 + UNIQUE(did) 138 + );` 139 + 140 + slog.Info("Create users table...") 141 + statement, err := db.Prepare(createTableSQL) 142 + if err != nil { 143 + return fmt.Errorf("prepare DB statement to create users table: %w", err) 144 + } 145 + _, err = statement.Exec() 146 + if err != nil { 147 + return fmt.Errorf("exec sql statement to create users table: %w", err) 148 + } 149 + slog.Info("users table created") 150 + 151 + return nil 152 + } 153 + 126 154 // CreateIssue will insert a issue into a database 127 155 func (d *Database) CreateIssue(issue Issue) error { 128 156 sql := `REPLACE INTO issues (authorDid, rkey, title, body, repo, createdAt) VALUES (?, ?, ?, ?, ?, ?);` ··· 135 163 136 164 // CreateComment will insert a comment into a database 137 165 func (d *Database) CreateComment(comment Comment) error { 138 - sql := `REPLACE INTO comments (authorDid, rkey, body, issue, replyTo, createdAt) VALUES (?, ?, ?, ?, ?, ?);` 139 - _, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.ReplyTo, comment.CreatedAt) 166 + sql := `REPLACE INTO comments (authorDid, rkey, body, issue, createdAt) VALUES (?, ?, ?, ?, ?);` 167 + _, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.CreatedAt) 140 168 if err != nil { 141 169 return fmt.Errorf("exec insert comment: %w", err) 170 + } 171 + return nil 172 + } 173 + 174 + // CreateUser will insert a user into a database 175 + func (d *Database) CreateUser(user User) error { 176 + sql := `REPLACE INTO users (did, handle, convoId, createdAt) VALUES (?, ?, ?, ?);` 177 + _, err := d.db.Exec(sql, user.DID, user.Handle, user.ConvoID, user.CreatedAt) 178 + if err != nil { 179 + return fmt.Errorf("exec insert user: %w", err) 142 180 } 143 181 return nil 144 182 } ··· 164 202 } 165 203 166 204 func (d *Database) GetComments() ([]Comment, error) { 167 - sql := "SELECT authorDid, rkey, body, issue, replyTo, createdAt FROM comments;" 205 + sql := "SELECT authorDid, rkey, body, issue, createdAt FROM comments;" 168 206 rows, err := d.db.Query(sql) 169 207 if err != nil { 170 208 return nil, fmt.Errorf("run query to get comments: %w", err) ··· 174 212 var results []Comment 175 213 for rows.Next() { 176 214 var comment Comment 177 - if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.ReplyTo, &comment.CreatedAt); err != nil { 215 + if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.CreatedAt); err != nil { 178 216 return nil, fmt.Errorf("scan row: %w", err) 179 217 } 180 218 ··· 182 220 } 183 221 return results, nil 184 222 } 223 + 224 + func (d *Database) GetUser(did string) (User, error) { 225 + sql := "SELECT did, handle, convoId, createdAt FROM users WHERE did = ?;" 226 + rows, err := d.db.Query(sql, did) 227 + if err != nil { 228 + return User{}, fmt.Errorf("run query to get user: %w", err) 229 + } 230 + defer rows.Close() 231 + 232 + for rows.Next() { 233 + var user User 234 + if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil { 235 + return User{}, fmt.Errorf("scan row: %w", err) 236 + } 237 + 238 + return user, nil 239 + } 240 + return User{}, fmt.Errorf("user not found") 241 + } 242 + 243 + func (d *Database) GetUsers() ([]User, error) { 244 + sql := "SELECT did, handle, convoId, createdAt FROM users;" 245 + rows, err := d.db.Query(sql) 246 + if err != nil { 247 + return nil, fmt.Errorf("run query to get user: %w", err) 248 + } 249 + defer rows.Close() 250 + 251 + var results []User 252 + for rows.Next() { 253 + var user User 254 + if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil { 255 + return nil, fmt.Errorf("scan row: %w", err) 256 + } 257 + results = append(results, user) 258 + } 259 + return results, nil 260 + } 261 + 262 + func (d *Database) DeleteIssue(did, rkey string) error { 263 + sql := "DELETE FROM issues WHERE authorDid = ? AND rkey = ?;" 264 + _, err := d.db.Exec(sql, did, rkey) 265 + if err != nil { 266 + return fmt.Errorf("exec delete issue: %w", err) 267 + } 268 + return nil 269 + } 270 + 271 + func (d *Database) DeleteComment(did, rkey string) error { 272 + sql := "DELETE FROM comments WHERE authorDid = ? AND rkey = ?;" 273 + _, err := d.db.Exec(sql, did, rkey) 274 + if err != nil { 275 + return fmt.Errorf("exec delete issue: %w", err) 276 + } 277 + return nil 278 + } 279 + 280 + func (d *Database) DeleteCommentsForIssue(issueURI string) error { 281 + sql := "DELETE FROM comments WHERE issue = ?;" 282 + _, err := d.db.Exec(sql, issueURI) 283 + if err != nil { 284 + return fmt.Errorf("exec delete comments for issue") 285 + } 286 + return nil 287 + } 288 + 289 + func (d *Database) DeleteUser(did string) error { 290 + sql := "DELETE FROM users WHERE did = ?;" 291 + _, err := d.db.Exec(sql, did) 292 + if err != nil { 293 + return fmt.Errorf("exec delete user") 294 + } 295 + return nil 296 + }
+441
dm_handler.go
··· 1 + package tangledalertbot 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "os" 12 + "strings" 13 + "time" 14 + 15 + "github.com/pkg/errors" 16 + ) 17 + 18 + const ( 19 + httpClientTimeoutDuration = time.Second * 5 20 + transportIdleConnTimeoutDuration = time.Second * 90 21 + baseBskyURL = "https://bsky.social/xrpc" 22 + ) 23 + 24 + type auth struct { 25 + AccessJwt string `json:"accessJwt"` 26 + RefershJWT string `json:"refreshJwt"` 27 + Did string `json:"did"` 28 + } 29 + 30 + type accessData struct { 31 + handle string 32 + appPassword string 33 + } 34 + 35 + type ListConvosResponse struct { 36 + Cursor string `json:"cursor"` 37 + Convos []Convo `json:"convos"` 38 + } 39 + 40 + type Convo struct { 41 + ID string `json:"id"` 42 + Members []ConvoMember `json:"members"` 43 + UnreadCount int `json:"unreadCount"` 44 + } 45 + 46 + type ConvoMember struct { 47 + Did string `json:"did"` 48 + Handle string `json:"handle"` 49 + } 50 + 51 + type ErrorResponse struct { 52 + Error string `json:"error"` 53 + } 54 + 55 + type MessageResp struct { 56 + Messages []Message `json:"messages"` 57 + Cursor string `json:"cursor"` 58 + } 59 + 60 + type Message struct { 61 + ID string `json:"id"` 62 + Sender MessageSender `json:"sender"` 63 + Text string `json:"text"` 64 + } 65 + 66 + type MessageSender struct { 67 + Did string `json:"did"` 68 + } 69 + 70 + type UpdateMessageReadRequest struct { 71 + ConvoID string `json:"convoId"` 72 + MessageID string `json:"messageId"` 73 + } 74 + 75 + type User struct { 76 + DID string 77 + Handle string 78 + ConvoID string 79 + CreatedAt int 80 + } 81 + 82 + type DmService struct { 83 + httpClient *http.Client 84 + accessData accessData 85 + auth auth 86 + timerDuration time.Duration 87 + pdsURL string 88 + store Store 89 + } 90 + 91 + func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) { 92 + httpClient := http.Client{ 93 + Timeout: httpClientTimeoutDuration, 94 + Transport: &http.Transport{ 95 + IdleConnTimeout: transportIdleConnTimeoutDuration, 96 + }, 97 + } 98 + 99 + accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE") 100 + accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD") 101 + pdsURL := os.Getenv("MESSAGING_PDS_URL") 102 + 103 + service := DmService{ 104 + httpClient: &httpClient, 105 + accessData: accessData{ 106 + handle: accessHandle, 107 + appPassword: accessAppPassword, 108 + }, 109 + timerDuration: timerDuration, 110 + pdsURL: pdsURL, 111 + store: store, 112 + } 113 + 114 + auth, err := service.Authenicate() 115 + if err != nil { 116 + return nil, fmt.Errorf("authenticating: %w", err) 117 + } 118 + 119 + service.auth = auth 120 + 121 + return &service, nil 122 + } 123 + 124 + func (d *DmService) Start(ctx context.Context) { 125 + go d.RefreshTask(ctx) 126 + 127 + timer := time.NewTimer(d.timerDuration) 128 + defer timer.Stop() 129 + 130 + for { 131 + select { 132 + case <-ctx.Done(): 133 + slog.Warn("context canceled - stopping dm task") 134 + return 135 + case <-timer.C: 136 + err := d.HandleMessageTimer(ctx) 137 + if err != nil { 138 + slog.Error("handle message timer", "error", err) 139 + } 140 + timer.Reset(d.timerDuration) 141 + } 142 + } 143 + } 144 + 145 + func (d *DmService) RefreshTask(ctx context.Context) { 146 + timer := time.NewTimer(time.Hour) 147 + defer timer.Stop() 148 + 149 + for { 150 + select { 151 + case <-ctx.Done(): 152 + return 153 + case <-timer.C: 154 + err := d.RefreshAuthenication(ctx) 155 + if err != nil { 156 + slog.Error("handle refresh auth timer", "error", err) 157 + // TODO: better retry with backoff probably 158 + timer.Reset(time.Minute) 159 + continue 160 + } 161 + timer.Reset(time.Hour) 162 + } 163 + } 164 + } 165 + 166 + func (d *DmService) HandleMessageTimer(ctx context.Context) error { 167 + convoResp, err := d.GetUnreadMessages() 168 + if err != nil { 169 + return fmt.Errorf("get unread messages: %w", err) 170 + } 171 + 172 + // TODO: handle the cursor pagination 173 + 174 + for _, convo := range convoResp.Convos { 175 + if convo.UnreadCount == 0 { 176 + continue 177 + } 178 + 179 + messageResp, err := d.GetMessages(ctx, convo.ID) 180 + if err != nil { 181 + slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID) 182 + continue 183 + } 184 + 185 + unreadCount := convo.UnreadCount 186 + unreadMessages := make([]Message, 0, convo.UnreadCount) 187 + // TODO: handle cursor pagination 188 + for _, msg := range messageResp.Messages { 189 + // TODO: techincally if I get to a message that's from the bot account, then there shouldn't be 190 + // an more unread messages? 191 + if msg.Sender.Did == d.auth.Did { 192 + continue 193 + } 194 + 195 + unreadMessages = append(unreadMessages, msg) 196 + unreadCount-- 197 + if unreadCount == 0 { 198 + break 199 + } 200 + } 201 + 202 + for _, msg := range unreadMessages { 203 + d.handleMessage(msg, convo) 204 + 205 + err = d.MarkMessageRead(msg.ID, convo.ID) 206 + if err != nil { 207 + slog.Error("marking message read", "error", err) 208 + continue 209 + } 210 + } 211 + } 212 + 213 + return nil 214 + } 215 + 216 + func (d *DmService) handleMessage(msg Message, convo Convo) { 217 + // TODO: add or remote user the list of "subsribed" users 218 + if strings.ToLower(msg.Text) == "subscribe" { 219 + userHandle := "" 220 + for _, member := range convo.Members { 221 + if member.Did == msg.Sender.Did { 222 + userHandle = member.Handle 223 + break 224 + } 225 + } 226 + 227 + if userHandle == "" { 228 + slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members) 229 + return 230 + } 231 + 232 + user := User{ 233 + DID: msg.Sender.Did, 234 + ConvoID: convo.ID, 235 + Handle: userHandle, 236 + CreatedAt: int(time.Now().UnixMilli()), 237 + } 238 + 239 + err := d.store.CreateUser(user) 240 + if err != nil { 241 + slog.Error("error creating user", "error", err, "user", user) 242 + return 243 + } 244 + } 245 + } 246 + 247 + func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) { 248 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL) 249 + request, err := http.NewRequest("GET", url, nil) 250 + if err != nil { 251 + return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err) 252 + } 253 + 254 + request.Header.Add("Content-Type", "application/json") 255 + request.Header.Add("Accept", "application/json") 256 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 257 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 258 + 259 + resp, err := d.httpClient.Do(request) 260 + if err != nil { 261 + return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err) 262 + } 263 + defer resp.Body.Close() 264 + 265 + if resp.StatusCode != http.StatusOK { 266 + var errorResp ErrorResponse 267 + err = decodeResp(resp.Body, &errorResp) 268 + if err != nil { 269 + return ListConvosResponse{}, err 270 + } 271 + 272 + return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 273 + } 274 + 275 + var listConvoResp ListConvosResponse 276 + err = decodeResp(resp.Body, &listConvoResp) 277 + if err != nil { 278 + return ListConvosResponse{}, err 279 + } 280 + 281 + return listConvoResp, nil 282 + } 283 + 284 + func (d *DmService) MarkMessageRead(messageID, convoID string) error { 285 + bodyReq := UpdateMessageReadRequest{ 286 + ConvoID: convoID, 287 + MessageID: messageID, 288 + } 289 + 290 + bodyB, err := json.Marshal(bodyReq) 291 + if err != nil { 292 + return fmt.Errorf("marshal update message request body: %w", err) 293 + } 294 + 295 + r := bytes.NewReader(bodyB) 296 + 297 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL) 298 + request, err := http.NewRequest("POST", url, r) 299 + if err != nil { 300 + return fmt.Errorf("create new list convos http request: %w", err) 301 + } 302 + 303 + request.Header.Add("Content-Type", "application/json") 304 + request.Header.Add("Accept", "application/json") 305 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 306 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 307 + 308 + resp, err := d.httpClient.Do(request) 309 + if err != nil { 310 + return fmt.Errorf("do http request to update message read: %w", err) 311 + } 312 + defer resp.Body.Close() 313 + 314 + if resp.StatusCode == http.StatusOK { 315 + return nil 316 + } 317 + 318 + var errorResp ErrorResponse 319 + err = decodeResp(resp.Body, &errorResp) 320 + if err != nil { 321 + return err 322 + } 323 + 324 + return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 325 + 326 + } 327 + 328 + func (d *DmService) Authenicate() (auth, error) { 329 + url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL) 330 + 331 + requestData := map[string]interface{}{ 332 + "identifier": d.accessData.handle, 333 + "password": d.accessData.appPassword, 334 + } 335 + 336 + data, err := json.Marshal(requestData) 337 + if err != nil { 338 + return auth{}, errors.Wrap(err, "failed to marshal request") 339 + } 340 + 341 + r := bytes.NewReader(data) 342 + 343 + request, err := http.NewRequest("POST", url, r) 344 + if err != nil { 345 + return auth{}, errors.Wrap(err, "failed to create request") 346 + } 347 + 348 + request.Header.Add("Content-Type", "application/json") 349 + 350 + resp, err := d.httpClient.Do(request) 351 + if err != nil { 352 + return auth{}, errors.Wrap(err, "failed to make request") 353 + } 354 + defer resp.Body.Close() 355 + 356 + var loginResp auth 357 + err = decodeResp(resp.Body, &loginResp) 358 + if err != nil { 359 + return auth{}, err 360 + } 361 + 362 + return loginResp, nil 363 + } 364 + 365 + func (d *DmService) RefreshAuthenication(ctx context.Context) error { 366 + url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL) 367 + 368 + request, err := http.NewRequest("POST", url, nil) 369 + if err != nil { 370 + return errors.Wrap(err, "failed to create request") 371 + } 372 + 373 + request.Header.Add("Content-Type", "application/json") 374 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT)) 375 + 376 + resp, err := d.httpClient.Do(request) 377 + if err != nil { 378 + return errors.Wrap(err, "failed to make request") 379 + } 380 + defer resp.Body.Close() 381 + 382 + var loginResp auth 383 + err = decodeResp(resp.Body, &loginResp) 384 + if err != nil { 385 + return err 386 + } 387 + 388 + d.auth = loginResp 389 + 390 + return nil 391 + } 392 + 393 + func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) { 394 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID) 395 + request, err := http.NewRequest("GET", url, nil) 396 + if err != nil { 397 + return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err) 398 + } 399 + 400 + request.Header.Add("Content-Type", "application/json") 401 + request.Header.Add("Accept", "application/json") 402 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 403 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 404 + 405 + resp, err := d.httpClient.Do(request) 406 + if err != nil { 407 + return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err) 408 + } 409 + defer resp.Body.Close() 410 + 411 + if resp.StatusCode != http.StatusOK { 412 + var errorResp ErrorResponse 413 + err = decodeResp(resp.Body, &errorResp) 414 + if err != nil { 415 + return MessageResp{}, err 416 + } 417 + 418 + return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 419 + } 420 + 421 + var messageResp MessageResp 422 + err = decodeResp(resp.Body, &messageResp) 423 + if err != nil { 424 + return MessageResp{}, err 425 + } 426 + 427 + return messageResp, nil 428 + } 429 + 430 + func decodeResp(body io.Reader, result any) error { 431 + resBody, err := io.ReadAll(body) 432 + if err != nil { 433 + return errors.Wrap(err, "failed to read response") 434 + } 435 + 436 + err = json.Unmarshal(resBody, result) 437 + if err != nil { 438 + return errors.Wrap(err, "failed to unmarshal response") 439 + } 440 + return nil 441 + }
+1 -1
go.mod
··· 8 8 github.com/bugsnag/bugsnag-go v2.6.2+incompatible 9 9 github.com/glebarez/go-sqlite v1.22.0 10 10 github.com/joho/godotenv v1.5.1 11 - tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98 11 + tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2 12 12 ) 13 13 14 14 require (
+4 -2
go.sum
··· 103 103 modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= 104 104 modernc.org/sqlite v1.28.0 h1:Zx+LyDDmXczNnEQdvPuEfcFVA2ZPyaD7UCZDjef3BHQ= 105 105 modernc.org/sqlite v1.28.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0= 106 - tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98 h1:WovrwwBufU89zoSaStoc6+qyUTEB/LxhUCM1MqGEUwU= 107 - tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98/go.mod h1:zXmPB9VMsPWpJ6Y51PWnzB1fL3w69P0IhR9rTXIfGPY= 106 + tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2 h1:4bcQewZPzb7WfCuUPf4MPVWb04JiTbjbShcg5ONi9co= 107 + tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2/go.mod h1:tYTB3RkgkeDAOFE0qX/9tQB80fdlDPR+vz4CdTMar3Y= 108 + tangled.org/core v1.9.0-alpha.0.20250924200730-b2d8a54abc3d h1:DmdCyK+BZDYitJy6TdqTwvcci2EVYgDu2+LR853nyls= 109 + tangled.org/core v1.9.0-alpha.0.20250924200730-b2d8a54abc3d/go.mod h1:tYTB3RkgkeDAOFE0qX/9tQB80fdlDPR+vz4CdTMar3Y=