forked from
tangled.org/core
Monorepo for Tangled
1package db
2
3import (
4 "context"
5 "slices"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/appview/db"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/appview/notify"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15 "tangled.org/core/sets"
16)
17
18const (
19 maxMentions = 8
20)
21
22type databaseNotifier struct {
23 db *db.DB
24 res *idresolver.Resolver
25}
26
27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
28 return &databaseNotifier{
29 db: database,
30 res: resolver,
31 }
32}
33
34var _ notify.Notifier = &databaseNotifier{}
35
36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
37 // no-op for now
38}
39func (n *databaseNotifier) DeleteRepo(ctx context.Context, repo *models.Repo) {
40 // no-op for now
41}
42
43func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
44 l := log.FromContext(ctx)
45
46 if star.RepoAt.Collection().String() != tangled.RepoNSID {
47 // skip string stars for now
48 return
49 }
50 var err error
51 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
52 if err != nil {
53 l.Error("failed to get repos", "err", err)
54 return
55 }
56
57 actorDid := syntax.DID(star.Did)
58 recipients := sets.Singleton(syntax.DID(repo.Did))
59 eventType := models.NotificationTypeRepoStarred
60 entityType := "repo"
61 entityId := star.RepoAt.String()
62 repoId := &repo.Id
63 var issueId *int64
64 var pullId *int64
65
66 n.notifyEvent(
67 ctx,
68 actorDid,
69 recipients,
70 eventType,
71 entityType,
72 entityId,
73 repoId,
74 issueId,
75 pullId,
76 )
77}
78
79func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
80 // no-op
81}
82
83func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
84 l := log.FromContext(ctx)
85
86 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
87 if err != nil {
88 l.Error("failed to fetch collaborators", "err", err)
89 return
90 }
91
92 // build the recipients list
93 // - owner of the repo
94 // - collaborators in the repo
95 // - remove users already mentioned
96 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
97 for _, c := range collaborators {
98 recipients.Insert(c.SubjectDid)
99 }
100 for _, m := range mentions {
101 recipients.Remove(m)
102 }
103
104 actorDid := syntax.DID(issue.Did)
105 entityType := "issue"
106 entityId := issue.AtUri().String()
107 repoId := &issue.Repo.Id
108 issueId := &issue.Id
109 var pullId *int64
110
111 n.notifyEvent(
112 ctx,
113 actorDid,
114 recipients,
115 models.NotificationTypeIssueCreated,
116 entityType,
117 entityId,
118 repoId,
119 issueId,
120 pullId,
121 )
122 n.notifyEvent(
123 ctx,
124 actorDid,
125 sets.Collect(slices.Values(mentions)),
126 models.NotificationTypeUserMentioned,
127 entityType,
128 entityId,
129 repoId,
130 issueId,
131 pullId,
132 )
133}
134
135func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
136 l := log.FromContext(ctx)
137
138 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt))
139 if err != nil {
140 l.Error("failed to get issues", "err", err)
141 return
142 }
143 if len(issues) == 0 {
144 l.Error("no issue found for", "err", comment.IssueAt)
145 return
146 }
147 issue := issues[0]
148
149 // built the recipients list:
150 // - the owner of the repo
151 // - | if the comment is a reply -> everybody on that thread
152 // | if the comment is a top level -> just the issue owner
153 // - remove mentioned users from the recipients list
154 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
155
156 if comment.IsReply() {
157 // if this comment is a reply, then notify everybody in that thread
158 parentAtUri := *comment.ReplyTo
159
160 // find the parent thread, and add all DIDs from here to the recipient list
161 for _, t := range issue.CommentList() {
162 if t.Self.AtUri().String() == parentAtUri {
163 for _, p := range t.Participants() {
164 recipients.Insert(p)
165 }
166 }
167 }
168 } else {
169 // not a reply, notify just the issue author
170 recipients.Insert(syntax.DID(issue.Did))
171 }
172
173 for _, m := range mentions {
174 recipients.Remove(m)
175 }
176
177 actorDid := syntax.DID(comment.Did)
178 entityType := "issue"
179 entityId := issue.AtUri().String()
180 repoId := &issue.Repo.Id
181 issueId := &issue.Id
182 var pullId *int64
183
184 n.notifyEvent(
185 ctx,
186 actorDid,
187 recipients,
188 models.NotificationTypeIssueCommented,
189 entityType,
190 entityId,
191 repoId,
192 issueId,
193 pullId,
194 )
195 n.notifyEvent(
196 ctx,
197 actorDid,
198 sets.Collect(slices.Values(mentions)),
199 models.NotificationTypeUserMentioned,
200 entityType,
201 entityId,
202 repoId,
203 issueId,
204 pullId,
205 )
206}
207
208func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
209 // no-op for now
210}
211
212func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {}
213func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, pull *models.Pull) {}
214
215func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
216 actorDid := syntax.DID(follow.UserDid)
217 recipients := sets.Singleton(syntax.DID(follow.SubjectDid))
218 eventType := models.NotificationTypeFollowed
219 entityType := "follow"
220 entityId := follow.UserDid
221 var repoId, issueId, pullId *int64
222
223 n.notifyEvent(
224 ctx,
225 actorDid,
226 recipients,
227 eventType,
228 entityType,
229 entityId,
230 repoId,
231 issueId,
232 pullId,
233 )
234}
235
236func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
237 // no-op
238}
239
240func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
241 l := log.FromContext(ctx)
242
243 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
244 if err != nil {
245 l.Error("failed to get repos", "err", err)
246 return
247 }
248 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
249 if err != nil {
250 l.Error("failed to fetch collaborators", "err", err)
251 return
252 }
253
254 // build the recipients list
255 // - owner of the repo
256 // - collaborators in the repo
257 recipients := sets.Singleton(syntax.DID(repo.Did))
258 for _, c := range collaborators {
259 recipients.Insert(c.SubjectDid)
260 }
261
262 actorDid := syntax.DID(pull.OwnerDid)
263 eventType := models.NotificationTypePullCreated
264 entityType := "pull"
265 entityId := pull.AtUri().String()
266 repoId := &repo.Id
267 var issueId *int64
268 p := int64(pull.ID)
269 pullId := &p
270
271 n.notifyEvent(
272 ctx,
273 actorDid,
274 recipients,
275 eventType,
276 entityType,
277 entityId,
278 repoId,
279 issueId,
280 pullId,
281 )
282}
283
284func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
285 l := log.FromContext(ctx)
286
287 pull, err := db.GetPull(n.db,
288 syntax.ATURI(comment.RepoAt),
289 comment.PullId,
290 )
291 if err != nil {
292 l.Error("failed to get pulls", "err", err)
293 return
294 }
295
296 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt))
297 if err != nil {
298 l.Error("failed to get repos", "err", err)
299 return
300 }
301
302 // build up the recipients list:
303 // - repo owner
304 // - all pull participants
305 // - remove those already mentioned
306 recipients := sets.Singleton(syntax.DID(repo.Did))
307 for _, p := range pull.Participants() {
308 recipients.Insert(syntax.DID(p))
309 }
310 for _, m := range mentions {
311 recipients.Remove(m)
312 }
313
314 actorDid := syntax.DID(comment.OwnerDid)
315 eventType := models.NotificationTypePullCommented
316 entityType := "pull"
317 entityId := pull.AtUri().String()
318 repoId := &repo.Id
319 var issueId *int64
320 p := int64(pull.ID)
321 pullId := &p
322
323 n.notifyEvent(
324 ctx,
325 actorDid,
326 recipients,
327 eventType,
328 entityType,
329 entityId,
330 repoId,
331 issueId,
332 pullId,
333 )
334 n.notifyEvent(
335 ctx,
336 actorDid,
337 sets.Collect(slices.Values(mentions)),
338 models.NotificationTypeUserMentioned,
339 entityType,
340 entityId,
341 repoId,
342 issueId,
343 pullId,
344 )
345}
346
347func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
348 // no-op
349}
350
351func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
352 // no-op
353}
354
355func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
356 // no-op
357}
358
359func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
360 // no-op
361}
362
363func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) {
364 // no-op for now; webhooks are handled by the webhook notifier
365}
366
367func (n *databaseNotifier) Clone(ctx context.Context, repo *models.Repo) {
368 // no-op
369}
370
371func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
372 l := log.FromContext(ctx)
373
374 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
375 if err != nil {
376 l.Error("failed to fetch collaborators", "err", err)
377 return
378 }
379
380 // build up the recipients list:
381 // - repo owner
382 // - repo collaborators
383 // - all issue participants
384 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
385 for _, c := range collaborators {
386 recipients.Insert(c.SubjectDid)
387 }
388 for _, p := range issue.Participants() {
389 recipients.Insert(syntax.DID(p))
390 }
391
392 entityType := "issue"
393 entityId := issue.AtUri().String()
394 repoId := &issue.Repo.Id
395 issueId := &issue.Id
396 var pullId *int64
397 var eventType models.NotificationType
398
399 if issue.Open {
400 eventType = models.NotificationTypeIssueReopen
401 } else {
402 eventType = models.NotificationTypeIssueClosed
403 }
404
405 n.notifyEvent(
406 ctx,
407 actor,
408 recipients,
409 eventType,
410 entityType,
411 entityId,
412 repoId,
413 issueId,
414 pullId,
415 )
416}
417
418func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
419 l := log.FromContext(ctx)
420
421 // Get repo details
422 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
423 if err != nil {
424 l.Error("failed to get repos", "err", err)
425 return
426 }
427
428 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
429 if err != nil {
430 l.Error("failed to fetch collaborators", "err", err)
431 return
432 }
433
434 // build up the recipients list:
435 // - repo owner
436 // - all pull participants
437 recipients := sets.Singleton(syntax.DID(repo.Did))
438 for _, c := range collaborators {
439 recipients.Insert(c.SubjectDid)
440 }
441 for _, p := range pull.Participants() {
442 recipients.Insert(syntax.DID(p))
443 }
444
445 entityType := "pull"
446 entityId := pull.AtUri().String()
447 repoId := &repo.Id
448 var issueId *int64
449 var eventType models.NotificationType
450 switch pull.State {
451 case models.PullClosed:
452 eventType = models.NotificationTypePullClosed
453 case models.PullOpen:
454 eventType = models.NotificationTypePullReopen
455 case models.PullMerged:
456 eventType = models.NotificationTypePullMerged
457 default:
458 l.Error("unexpected new PR state", "state", pull.State)
459 return
460 }
461 p := int64(pull.ID)
462 pullId := &p
463
464 n.notifyEvent(
465 ctx,
466 actor,
467 recipients,
468 eventType,
469 entityType,
470 entityId,
471 repoId,
472 issueId,
473 pullId,
474 )
475}
476
477func (n *databaseNotifier) notifyEvent(
478 ctx context.Context,
479 actorDid syntax.DID,
480 recipients sets.Set[syntax.DID],
481 eventType models.NotificationType,
482 entityType string,
483 entityId string,
484 repoId *int64,
485 issueId *int64,
486 pullId *int64,
487) {
488 l := log.FromContext(ctx)
489
490 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
491 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
492 return
493 }
494
495 recipients.Remove(actorDid)
496
497 prefMap, err := db.GetNotificationPreferences(
498 n.db,
499 orm.FilterIn("user_did", slices.Collect(recipients.All())),
500 )
501 if err != nil {
502 // failed to get prefs for users
503 return
504 }
505
506 // create a transaction for bulk notification storage
507 tx, err := n.db.Begin()
508 if err != nil {
509 // failed to start tx
510 return
511 }
512 defer tx.Rollback()
513
514 // filter based on preferences
515 for recipientDid := range recipients.All() {
516 prefs, ok := prefMap[recipientDid]
517 if !ok {
518 prefs = models.DefaultNotificationPreferences(recipientDid)
519 }
520
521 // skip users who don’t want this type
522 if !prefs.ShouldNotify(eventType) {
523 continue
524 }
525
526 // create notification
527 notif := &models.Notification{
528 RecipientDid: recipientDid.String(),
529 ActorDid: actorDid.String(),
530 Type: eventType,
531 EntityType: entityType,
532 EntityId: entityId,
533 RepoId: repoId,
534 IssueId: issueId,
535 PullId: pullId,
536 }
537
538 if err := db.CreateNotification(tx, notif); err != nil {
539 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
540 }
541 }
542
543 if err := tx.Commit(); err != nil {
544 // failed to commit
545 return
546 }
547}