-2
cmd/main.go
-2
cmd/main.go
+68
-8
consumer.go
+68
-8
consumer.go
···
29
RKey string `json:"rkey"`
30
Body string `json:"body"`
31
Issue string `json:"issue" `
32
-
ReplyTo string `json:"replyTo"`
33
CreatedAt int64 `json:"createdAt"`
34
}
35
36
type Store interface {
37
CreateIssue(issue Issue) error
38
CreateComment(comment Comment) error
39
}
40
41
// JetstreamConsumer is responsible for consuming from a jetstream instance
···
102
103
switch event.Commit.Operation {
104
case models.CommitOperationCreate, models.CommitOperationUpdate:
105
-
return h.handleCreateEvent(ctx, event)
106
-
// TODO: handle deletes too
107
default:
108
return nil
109
}
110
}
111
112
-
func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error {
113
switch event.Commit.Collection {
114
case tangled.RepoIssueNSID:
115
-
h.handleIssueEvent(ctx, event)
116
case tangled.RepoIssueCommentNSID:
117
-
h.handleIssueCommentEvent(ctx, event)
118
default:
119
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
120
return nil
···
123
return nil
124
}
125
126
-
func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) {
127
var issue tangled.RepoIssue
128
129
err := json.Unmarshal(event.Commit.Record, &issue)
···
162
slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
163
}
164
165
-
func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) {
166
var comment tangled.RepoIssueComment
167
168
err := json.Unmarshal(event.Commit.Record, &comment)
···
181
slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
182
createdAt = time.Now().UTC()
183
}
184
err = h.store.CreateComment(Comment{
185
AuthorDID: did,
186
RKey: rkey,
···
199
200
slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
201
}
···
29
RKey string `json:"rkey"`
30
Body string `json:"body"`
31
Issue string `json:"issue" `
32
CreatedAt int64 `json:"createdAt"`
33
}
34
35
type Store interface {
36
CreateIssue(issue Issue) error
37
CreateComment(comment Comment) error
38
+
DeleteIssue(did, rkey string) error
39
+
DeleteComment(did, rkey string) error
40
+
DeleteCommentsForIssue(issueURI string) error
41
}
42
43
// JetstreamConsumer is responsible for consuming from a jetstream instance
···
104
105
switch event.Commit.Operation {
106
case models.CommitOperationCreate, models.CommitOperationUpdate:
107
+
return h.handleCreateUpdateEvent(ctx, event)
108
+
case models.CommitOperationDelete:
109
+
return h.handleDeleteEvent(ctx, event)
110
default:
111
return nil
112
}
113
}
114
115
+
func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error {
116
switch event.Commit.Collection {
117
case tangled.RepoIssueNSID:
118
+
h.handleCreateUpdateIssueEvent(ctx, event)
119
case tangled.RepoIssueCommentNSID:
120
+
h.handleCreateUpdateIssueCommentEvent(ctx, event)
121
default:
122
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
123
return nil
···
126
return nil
127
}
128
129
+
func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error {
130
+
switch event.Commit.Collection {
131
+
case tangled.RepoIssueNSID:
132
+
h.handleDeleteIssueEvent(ctx, event)
133
+
case tangled.RepoIssueCommentNSID:
134
+
h.handleDeleteIssueCommentEvent(ctx, event)
135
+
default:
136
+
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
137
+
return nil
138
+
}
139
+
140
+
return nil
141
+
}
142
+
143
+
func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) {
144
var issue tangled.RepoIssue
145
146
err := json.Unmarshal(event.Commit.Record, &issue)
···
179
slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
180
}
181
182
+
func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) {
183
var comment tangled.RepoIssueComment
184
185
err := json.Unmarshal(event.Commit.Record, &comment)
···
198
slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
199
createdAt = time.Now().UTC()
200
}
201
+
202
+
// TODO: if there is a reply to present, don't store the comment because replies can't be replied to so
203
+
// the reply comment doesn't need to be stored
204
+
205
err = h.store.CreateComment(Comment{
206
AuthorDID: did,
207
RKey: rkey,
···
220
221
slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
222
}
223
+
224
+
func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) {
225
+
did := event.Did
226
+
rkey := event.Commit.RKey
227
+
228
+
err := h.store.DeleteIssue(did, rkey)
229
+
if err != nil {
230
+
bugsnag.Notify(err)
231
+
slog.Error("delete issue", "error", err, "did", did, "rkey", rkey)
232
+
return
233
+
}
234
+
235
+
// now attempt to delete any comments on that issue since they can't be replied to now.
236
+
// Note: if unsuccessful it doesn't matter because a deleted issue and its comments are
237
+
// not visible on the UI and so no one will be able to reply to them so this is just a
238
+
// cleanup operation
239
+
issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey)
240
+
err = h.store.DeleteCommentsForIssue(issueURI)
241
+
if err != nil {
242
+
bugsnag.Notify(err)
243
+
slog.Error("delete comments for issue", "error", err, "issue URI", issueURI)
244
+
}
245
+
246
+
slog.Info("deleted issue ", "did", did, "rkey", rkey)
247
+
}
248
+
249
+
func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) {
250
+
did := event.Did
251
+
rkey := event.Commit.RKey
252
+
253
+
err := h.store.DeleteComment(did, rkey)
254
+
if err != nil {
255
+
bugsnag.Notify(err)
256
+
slog.Error("delete comment", "error", err, "did", did, "rkey", rkey)
257
+
return
258
+
}
259
+
260
+
slog.Info("deleted comment ", "did", did, "rkey", rkey)
261
+
}
+31
-5
database.go
+31
-5
database.go
···
104
"rkey" TEXT,
105
"body" TEXT,
106
"issue" TEXT,
107
-
"replyTo" TEXT,
108
"createdAt" integer NOT NULL,
109
UNIQUE(authorDid,rkey)
110
);`
···
135
136
// CreateComment will insert a comment into a database
137
func (d *Database) CreateComment(comment Comment) error {
138
-
sql := `REPLACE INTO comments (authorDid, rkey, body, issue, replyTo, createdAt) VALUES (?, ?, ?, ?, ?, ?);`
139
-
_, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.ReplyTo, comment.CreatedAt)
140
if err != nil {
141
return fmt.Errorf("exec insert comment: %w", err)
142
}
···
164
}
165
166
func (d *Database) GetComments() ([]Comment, error) {
167
-
sql := "SELECT authorDid, rkey, body, issue, replyTo, createdAt FROM comments;"
168
rows, err := d.db.Query(sql)
169
if err != nil {
170
return nil, fmt.Errorf("run query to get comments: %w", err)
···
174
var results []Comment
175
for rows.Next() {
176
var comment Comment
177
-
if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.ReplyTo, &comment.CreatedAt); err != nil {
178
return nil, fmt.Errorf("scan row: %w", err)
179
}
180
···
182
}
183
return results, nil
184
}
···
104
"rkey" TEXT,
105
"body" TEXT,
106
"issue" TEXT,
107
"createdAt" integer NOT NULL,
108
UNIQUE(authorDid,rkey)
109
);`
···
134
135
// CreateComment will insert a comment into a database
136
func (d *Database) CreateComment(comment Comment) error {
137
+
sql := `REPLACE INTO comments (authorDid, rkey, body, issue, createdAt) VALUES (?, ?, ?, ?, ?);`
138
+
_, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.CreatedAt)
139
if err != nil {
140
return fmt.Errorf("exec insert comment: %w", err)
141
}
···
163
}
164
165
func (d *Database) GetComments() ([]Comment, error) {
166
+
sql := "SELECT authorDid, rkey, body, issue, createdAt FROM comments;"
167
rows, err := d.db.Query(sql)
168
if err != nil {
169
return nil, fmt.Errorf("run query to get comments: %w", err)
···
173
var results []Comment
174
for rows.Next() {
175
var comment Comment
176
+
if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.CreatedAt); err != nil {
177
return nil, fmt.Errorf("scan row: %w", err)
178
}
179
···
181
}
182
return results, nil
183
}
184
+
185
+
func (d *Database) DeleteIssue(did, rkey string) error {
186
+
sql := "DELETE FROM issues WHERE authorDid = ? AND rkey = ?;"
187
+
_, err := d.db.Exec(sql, did, rkey)
188
+
if err != nil {
189
+
return fmt.Errorf("exec delete issue: %w", err)
190
+
}
191
+
return nil
192
+
}
193
+
194
+
func (d *Database) DeleteComment(did, rkey string) error {
195
+
sql := "DELETE FROM comments WHERE authorDid = ? AND rkey = ?;"
196
+
_, err := d.db.Exec(sql, did, rkey)
197
+
if err != nil {
198
+
return fmt.Errorf("exec delete issue: %w", err)
199
+
}
200
+
return nil
201
+
}
202
+
203
+
func (d *Database) DeleteCommentsForIssue(issueURI string) error {
204
+
sql := "DELETE FROM comments WHERE issue = ?;"
205
+
_, err := d.db.Exec(sql, issueURI)
206
+
if err != nil {
207
+
return fmt.Errorf("exec delete comments for issue")
208
+
}
209
+
return nil
210
+
}