Monorepo for Tangled
at master 547 lines 12 kB view raw
1package db 2 3import ( 4 "context" 5 "slices" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/appview/db" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/appview/notify" 12 "tangled.org/core/idresolver" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15 "tangled.org/core/sets" 16) 17 18const ( 19 maxMentions = 8 20) 21 22type databaseNotifier struct { 23 db *db.DB 24 res *idresolver.Resolver 25} 26 27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 28 return &databaseNotifier{ 29 db: database, 30 res: resolver, 31 } 32} 33 34var _ notify.Notifier = &databaseNotifier{} 35 36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 37 // no-op for now 38} 39func (n *databaseNotifier) DeleteRepo(ctx context.Context, repo *models.Repo) { 40 // no-op for now 41} 42 43func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 44 l := log.FromContext(ctx) 45 46 if star.RepoAt.Collection().String() != tangled.RepoNSID { 47 // skip string stars for now 48 return 49 } 50 var err error 51 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt))) 52 if err != nil { 53 l.Error("failed to get repos", "err", err) 54 return 55 } 56 57 actorDid := syntax.DID(star.Did) 58 recipients := sets.Singleton(syntax.DID(repo.Did)) 59 eventType := models.NotificationTypeRepoStarred 60 entityType := "repo" 61 entityId := star.RepoAt.String() 62 repoId := &repo.Id 63 var issueId *int64 64 var pullId *int64 65 66 n.notifyEvent( 67 ctx, 68 actorDid, 69 recipients, 70 eventType, 71 entityType, 72 entityId, 73 repoId, 74 issueId, 75 pullId, 76 ) 77} 78 79func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 80 // no-op 81} 82 83func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 84 l := log.FromContext(ctx) 85 86 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 87 if err != nil { 88 l.Error("failed to fetch collaborators", "err", err) 89 return 90 } 91 92 // build the recipients list 93 // - owner of the repo 94 // - collaborators in the repo 95 // - remove users already mentioned 96 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 97 for _, c := range collaborators { 98 recipients.Insert(c.SubjectDid) 99 } 100 for _, m := range mentions { 101 recipients.Remove(m) 102 } 103 104 actorDid := syntax.DID(issue.Did) 105 entityType := "issue" 106 entityId := issue.AtUri().String() 107 repoId := &issue.Repo.Id 108 issueId := &issue.Id 109 var pullId *int64 110 111 n.notifyEvent( 112 ctx, 113 actorDid, 114 recipients, 115 models.NotificationTypeIssueCreated, 116 entityType, 117 entityId, 118 repoId, 119 issueId, 120 pullId, 121 ) 122 n.notifyEvent( 123 ctx, 124 actorDid, 125 sets.Collect(slices.Values(mentions)), 126 models.NotificationTypeUserMentioned, 127 entityType, 128 entityId, 129 repoId, 130 issueId, 131 pullId, 132 ) 133} 134 135func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) { 136 l := log.FromContext(ctx) 137 138 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt)) 139 if err != nil { 140 l.Error("failed to get issues", "err", err) 141 return 142 } 143 if len(issues) == 0 { 144 l.Error("no issue found for", "err", comment.IssueAt) 145 return 146 } 147 issue := issues[0] 148 149 // built the recipients list: 150 // - the owner of the repo 151 // - | if the comment is a reply -> everybody on that thread 152 // | if the comment is a top level -> just the issue owner 153 // - remove mentioned users from the recipients list 154 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 155 156 if comment.IsReply() { 157 // if this comment is a reply, then notify everybody in that thread 158 parentAtUri := *comment.ReplyTo 159 160 // find the parent thread, and add all DIDs from here to the recipient list 161 for _, t := range issue.CommentList() { 162 if t.Self.AtUri().String() == parentAtUri { 163 for _, p := range t.Participants() { 164 recipients.Insert(p) 165 } 166 } 167 } 168 } else { 169 // not a reply, notify just the issue author 170 recipients.Insert(syntax.DID(issue.Did)) 171 } 172 173 for _, m := range mentions { 174 recipients.Remove(m) 175 } 176 177 actorDid := syntax.DID(comment.Did) 178 entityType := "issue" 179 entityId := issue.AtUri().String() 180 repoId := &issue.Repo.Id 181 issueId := &issue.Id 182 var pullId *int64 183 184 n.notifyEvent( 185 ctx, 186 actorDid, 187 recipients, 188 models.NotificationTypeIssueCommented, 189 entityType, 190 entityId, 191 repoId, 192 issueId, 193 pullId, 194 ) 195 n.notifyEvent( 196 ctx, 197 actorDid, 198 sets.Collect(slices.Values(mentions)), 199 models.NotificationTypeUserMentioned, 200 entityType, 201 entityId, 202 repoId, 203 issueId, 204 pullId, 205 ) 206} 207 208func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 209 // no-op for now 210} 211 212func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {} 213func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, pull *models.Pull) {} 214 215func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 216 actorDid := syntax.DID(follow.UserDid) 217 recipients := sets.Singleton(syntax.DID(follow.SubjectDid)) 218 eventType := models.NotificationTypeFollowed 219 entityType := "follow" 220 entityId := follow.UserDid 221 var repoId, issueId, pullId *int64 222 223 n.notifyEvent( 224 ctx, 225 actorDid, 226 recipients, 227 eventType, 228 entityType, 229 entityId, 230 repoId, 231 issueId, 232 pullId, 233 ) 234} 235 236func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 237 // no-op 238} 239 240func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 241 l := log.FromContext(ctx) 242 243 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 244 if err != nil { 245 l.Error("failed to get repos", "err", err) 246 return 247 } 248 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 249 if err != nil { 250 l.Error("failed to fetch collaborators", "err", err) 251 return 252 } 253 254 // build the recipients list 255 // - owner of the repo 256 // - collaborators in the repo 257 recipients := sets.Singleton(syntax.DID(repo.Did)) 258 for _, c := range collaborators { 259 recipients.Insert(c.SubjectDid) 260 } 261 262 actorDid := syntax.DID(pull.OwnerDid) 263 eventType := models.NotificationTypePullCreated 264 entityType := "pull" 265 entityId := pull.AtUri().String() 266 repoId := &repo.Id 267 var issueId *int64 268 p := int64(pull.ID) 269 pullId := &p 270 271 n.notifyEvent( 272 ctx, 273 actorDid, 274 recipients, 275 eventType, 276 entityType, 277 entityId, 278 repoId, 279 issueId, 280 pullId, 281 ) 282} 283 284func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 285 l := log.FromContext(ctx) 286 287 pull, err := db.GetPull(n.db, 288 syntax.ATURI(comment.RepoAt), 289 comment.PullId, 290 ) 291 if err != nil { 292 l.Error("failed to get pulls", "err", err) 293 return 294 } 295 296 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt)) 297 if err != nil { 298 l.Error("failed to get repos", "err", err) 299 return 300 } 301 302 // build up the recipients list: 303 // - repo owner 304 // - all pull participants 305 // - remove those already mentioned 306 recipients := sets.Singleton(syntax.DID(repo.Did)) 307 for _, p := range pull.Participants() { 308 recipients.Insert(syntax.DID(p)) 309 } 310 for _, m := range mentions { 311 recipients.Remove(m) 312 } 313 314 actorDid := syntax.DID(comment.OwnerDid) 315 eventType := models.NotificationTypePullCommented 316 entityType := "pull" 317 entityId := pull.AtUri().String() 318 repoId := &repo.Id 319 var issueId *int64 320 p := int64(pull.ID) 321 pullId := &p 322 323 n.notifyEvent( 324 ctx, 325 actorDid, 326 recipients, 327 eventType, 328 entityType, 329 entityId, 330 repoId, 331 issueId, 332 pullId, 333 ) 334 n.notifyEvent( 335 ctx, 336 actorDid, 337 sets.Collect(slices.Values(mentions)), 338 models.NotificationTypeUserMentioned, 339 entityType, 340 entityId, 341 repoId, 342 issueId, 343 pullId, 344 ) 345} 346 347func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 348 // no-op 349} 350 351func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 352 // no-op 353} 354 355func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 356 // no-op 357} 358 359func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 360 // no-op 361} 362 363func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 364 // no-op for now; webhooks are handled by the webhook notifier 365} 366 367func (n *databaseNotifier) Clone(ctx context.Context, repo *models.Repo) { 368 // no-op 369} 370 371func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 372 l := log.FromContext(ctx) 373 374 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 375 if err != nil { 376 l.Error("failed to fetch collaborators", "err", err) 377 return 378 } 379 380 // build up the recipients list: 381 // - repo owner 382 // - repo collaborators 383 // - all issue participants 384 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 385 for _, c := range collaborators { 386 recipients.Insert(c.SubjectDid) 387 } 388 for _, p := range issue.Participants() { 389 recipients.Insert(syntax.DID(p)) 390 } 391 392 entityType := "issue" 393 entityId := issue.AtUri().String() 394 repoId := &issue.Repo.Id 395 issueId := &issue.Id 396 var pullId *int64 397 var eventType models.NotificationType 398 399 if issue.Open { 400 eventType = models.NotificationTypeIssueReopen 401 } else { 402 eventType = models.NotificationTypeIssueClosed 403 } 404 405 n.notifyEvent( 406 ctx, 407 actor, 408 recipients, 409 eventType, 410 entityType, 411 entityId, 412 repoId, 413 issueId, 414 pullId, 415 ) 416} 417 418func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 419 l := log.FromContext(ctx) 420 421 // Get repo details 422 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 423 if err != nil { 424 l.Error("failed to get repos", "err", err) 425 return 426 } 427 428 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 429 if err != nil { 430 l.Error("failed to fetch collaborators", "err", err) 431 return 432 } 433 434 // build up the recipients list: 435 // - repo owner 436 // - all pull participants 437 recipients := sets.Singleton(syntax.DID(repo.Did)) 438 for _, c := range collaborators { 439 recipients.Insert(c.SubjectDid) 440 } 441 for _, p := range pull.Participants() { 442 recipients.Insert(syntax.DID(p)) 443 } 444 445 entityType := "pull" 446 entityId := pull.AtUri().String() 447 repoId := &repo.Id 448 var issueId *int64 449 var eventType models.NotificationType 450 switch pull.State { 451 case models.PullClosed: 452 eventType = models.NotificationTypePullClosed 453 case models.PullOpen: 454 eventType = models.NotificationTypePullReopen 455 case models.PullMerged: 456 eventType = models.NotificationTypePullMerged 457 default: 458 l.Error("unexpected new PR state", "state", pull.State) 459 return 460 } 461 p := int64(pull.ID) 462 pullId := &p 463 464 n.notifyEvent( 465 ctx, 466 actor, 467 recipients, 468 eventType, 469 entityType, 470 entityId, 471 repoId, 472 issueId, 473 pullId, 474 ) 475} 476 477func (n *databaseNotifier) notifyEvent( 478 ctx context.Context, 479 actorDid syntax.DID, 480 recipients sets.Set[syntax.DID], 481 eventType models.NotificationType, 482 entityType string, 483 entityId string, 484 repoId *int64, 485 issueId *int64, 486 pullId *int64, 487) { 488 l := log.FromContext(ctx) 489 490 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody 491 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions { 492 return 493 } 494 495 recipients.Remove(actorDid) 496 497 prefMap, err := db.GetNotificationPreferences( 498 n.db, 499 orm.FilterIn("user_did", slices.Collect(recipients.All())), 500 ) 501 if err != nil { 502 // failed to get prefs for users 503 return 504 } 505 506 // create a transaction for bulk notification storage 507 tx, err := n.db.Begin() 508 if err != nil { 509 // failed to start tx 510 return 511 } 512 defer tx.Rollback() 513 514 // filter based on preferences 515 for recipientDid := range recipients.All() { 516 prefs, ok := prefMap[recipientDid] 517 if !ok { 518 prefs = models.DefaultNotificationPreferences(recipientDid) 519 } 520 521 // skip users who don’t want this type 522 if !prefs.ShouldNotify(eventType) { 523 continue 524 } 525 526 // create notification 527 notif := &models.Notification{ 528 RecipientDid: recipientDid.String(), 529 ActorDid: actorDid.String(), 530 Type: eventType, 531 EntityType: entityType, 532 EntityId: entityId, 533 RepoId: repoId, 534 IssueId: issueId, 535 PullId: pullId, 536 } 537 538 if err := db.CreateNotification(tx, notif); err != nil { 539 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err) 540 } 541 } 542 543 if err := tx.Commit(); err != nil { 544 // failed to commit 545 return 546 } 547}