Monorepo for Tangled tangled.org
at sl/comment 1368 lines 34 kB view raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "sync" 16 17 "time" 18 19 "github.com/avast/retry-go/v4" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 jmodels "github.com/bluesky-social/jetstream/pkg/models" 22 "github.com/go-git/go-git/v5/plumbing" 23 "github.com/ipfs/go-cid" 24 "golang.org/x/sync/errgroup" 25 "tangled.org/core/api/tangled" 26 "tangled.org/core/appview/config" 27 "tangled.org/core/appview/db" 28 "tangled.org/core/appview/mentions" 29 "tangled.org/core/appview/models" 30 "tangled.org/core/appview/notify" 31 "tangled.org/core/appview/serververify" 32 "tangled.org/core/appview/validator" 33 "tangled.org/core/idresolver" 34 "tangled.org/core/orm" 35 "tangled.org/core/rbac" 36) 37 38type Ingester struct { 39 Db db.DbWrapper 40 Enforcer *rbac.Enforcer 41 IdResolver *idresolver.Resolver 42 Config *config.Config 43 Logger *slog.Logger 44 Validator *validator.Validator 45 MentionsResolver *mentions.Resolver 46 Notifier notify.Notifier 47} 48 49type processFunc func(ctx context.Context, e *jmodels.Event) error 50 51func (i *Ingester) Ingest() processFunc { 52 return func(ctx context.Context, e *jmodels.Event) error { 53 var err error 54 55 l := i.Logger.With("kind", e.Kind) 56 switch e.Kind { 57 case jmodels.EventKindAccount: 58 // TODO: sync account state to db 59 if e.Account.Active { 60 break 61 } 62 // TODO: revoke sessions by DID 63 if *e.Account.Status == "deactivated" { 64 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 65 } 66 case jmodels.EventKindIdentity: 67 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 68 case jmodels.EventKindCommit: 69 switch e.Commit.Collection { 70 case tangled.GraphFollowNSID: 71 err = i.ingestFollow(e) 72 case tangled.FeedStarNSID: 73 err = i.ingestStar(e) 74 case tangled.PublicKeyNSID: 75 err = i.ingestPublicKey(e) 76 case tangled.RepoArtifactNSID: 77 err = i.ingestArtifact(e) 78 case tangled.ActorProfileNSID: 79 err = i.ingestProfile(ctx, e) 80 case tangled.SpindleMemberNSID: 81 err = i.ingestSpindleMember(ctx, e) 82 case tangled.SpindleNSID: 83 err = i.ingestSpindle(ctx, e) 84 case tangled.KnotMemberNSID: 85 err = i.ingestKnotMember(e) 86 case tangled.KnotNSID: 87 err = i.ingestKnot(e) 88 case tangled.StringNSID: 89 err = i.ingestString(e) 90 case tangled.RepoIssueNSID: 91 err = i.ingestIssue(ctx, e) 92 case tangled.RepoPullNSID: 93 err = i.ingestPull(ctx, e) 94 case tangled.FeedCommentNSID: 95 err = i.ingestComment(e) 96 case tangled.RepoIssueCommentNSID: 97 err = i.ingestIssueComment(e) 98 case tangled.RepoPullCommentNSID: 99 err = i.ingestPullComment(e) 100 case tangled.LabelDefinitionNSID: 101 err = i.ingestLabelDefinition(e) 102 case tangled.LabelOpNSID: 103 err = i.ingestLabelOp(e) 104 } 105 l = i.Logger.With("nsid", e.Commit.Collection) 106 } 107 108 if err != nil { 109 l.Warn("failed to ingest record, skipping", "err", err) 110 } 111 112 lastTimeUs := e.TimeUS + 1 113 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 114 l.Error("failed to save cursor", "err", saveErr) 115 } 116 117 return nil 118 } 119} 120 121func (i *Ingester) ingestStar(e *jmodels.Event) error { 122 var err error 123 did := e.Did 124 125 l := i.Logger.With("handler", "ingestStar") 126 l = l.With("nsid", e.Commit.Collection) 127 128 switch e.Commit.Operation { 129 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 130 var subjectUri syntax.ATURI 131 132 raw := json.RawMessage(e.Commit.Record) 133 record := tangled.FeedStar{} 134 err := json.Unmarshal(raw, &record) 135 if err != nil { 136 l.Error("invalid record", "err", err) 137 return err 138 } 139 140 star := &models.Star{ 141 Did: did, 142 Rkey: e.Commit.RKey, 143 } 144 145 switch { 146 case record.SubjectDid != nil: 147 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 148 if repoErr == nil { 149 subjectUri = repo.RepoAt() 150 star.RepoAt = subjectUri 151 } 152 case record.Subject != nil: 153 subjectUri, err = syntax.ParseATURI(*record.Subject) 154 if err != nil { 155 l.Error("invalid record", "err", err) 156 return err 157 } 158 star.RepoAt = subjectUri 159 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 160 if repoErr == nil && repo.RepoDid != "" { 161 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 162 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 163 } 164 } 165 default: 166 l.Error("star record has neither subject nor subjectDid") 167 return fmt.Errorf("star record has neither subject nor subjectDid") 168 } 169 err = db.AddStar(i.Db, star) 170 case jmodels.CommitOperationDelete: 171 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 172 } 173 174 if err != nil { 175 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 176 } 177 178 return nil 179} 180 181func (i *Ingester) ingestFollow(e *jmodels.Event) error { 182 var err error 183 did := e.Did 184 185 l := i.Logger.With("handler", "ingestFollow") 186 l = l.With("nsid", e.Commit.Collection) 187 188 switch e.Commit.Operation { 189 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 190 raw := json.RawMessage(e.Commit.Record) 191 record := tangled.GraphFollow{} 192 err = json.Unmarshal(raw, &record) 193 if err != nil { 194 l.Error("invalid record", "err", err) 195 return err 196 } 197 198 err = db.AddFollow(i.Db, &models.Follow{ 199 UserDid: did, 200 SubjectDid: record.Subject, 201 Rkey: e.Commit.RKey, 202 }) 203 case jmodels.CommitOperationDelete: 204 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 205 } 206 207 if err != nil { 208 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 209 } 210 211 return nil 212} 213 214func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 215 did := e.Did 216 var err error 217 218 l := i.Logger.With("handler", "ingestPublicKey") 219 l = l.With("nsid", e.Commit.Collection) 220 221 switch e.Commit.Operation { 222 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 223 l.Debug("processing add of pubkey") 224 raw := json.RawMessage(e.Commit.Record) 225 record := tangled.PublicKey{} 226 err = json.Unmarshal(raw, &record) 227 if err != nil { 228 l.Error("invalid record", "err", err) 229 return err 230 } 231 232 name := record.Name 233 key := record.Key 234 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 235 case jmodels.CommitOperationDelete: 236 l.Debug("processing delete of pubkey") 237 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 238 } 239 240 if err != nil { 241 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 242 } 243 244 return nil 245} 246 247func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 248 did := e.Did 249 var err error 250 251 l := i.Logger.With("handler", "ingestArtifact") 252 l = l.With("nsid", e.Commit.Collection) 253 254 switch e.Commit.Operation { 255 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 256 raw := json.RawMessage(e.Commit.Record) 257 record := tangled.RepoArtifact{} 258 err = json.Unmarshal(raw, &record) 259 if err != nil { 260 l.Error("invalid record", "err", err) 261 return err 262 } 263 264 var repo *models.Repo 265 if record.RepoDid != nil && *record.RepoDid != "" { 266 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 267 if err != nil && !errors.Is(err, sql.ErrNoRows) { 268 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 269 } 270 } 271 if repo == nil && record.Repo != nil { 272 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 273 if parseErr != nil { 274 return parseErr 275 } 276 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 277 if err != nil { 278 return err 279 } 280 } 281 if repo == nil { 282 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 283 } 284 285 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 286 if err != nil || !ok { 287 return err 288 } 289 290 repoDid := repo.RepoDid 291 if repoDid == "" && record.RepoDid != nil { 292 repoDid = *record.RepoDid 293 } 294 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 295 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 296 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 297 } 298 } 299 300 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 301 if err != nil { 302 createdAt = time.Now() 303 } 304 305 artifact := models.Artifact{ 306 Did: did, 307 Rkey: e.Commit.RKey, 308 RepoAt: repo.RepoAt(), 309 Tag: plumbing.Hash(record.Tag), 310 CreatedAt: createdAt, 311 BlobCid: cid.Cid(record.Artifact.Ref), 312 Name: record.Name, 313 Size: uint64(record.Artifact.Size), 314 MimeType: record.Artifact.MimeType, 315 } 316 317 err = db.AddArtifact(i.Db, artifact) 318 case jmodels.CommitOperationDelete: 319 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 320 } 321 322 if err != nil { 323 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 324 } 325 326 return nil 327} 328 329func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 330 did := e.Did 331 var err error 332 333 l := i.Logger.With("handler", "ingestProfile") 334 l = l.With("nsid", e.Commit.Collection) 335 336 if e.Commit.RKey != "self" { 337 return fmt.Errorf("ingestProfile only ingests `self` record") 338 } 339 340 switch e.Commit.Operation { 341 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 342 raw := json.RawMessage(e.Commit.Record) 343 record := tangled.ActorProfile{} 344 err = json.Unmarshal(raw, &record) 345 if err != nil { 346 l.Error("invalid record", "err", err) 347 return err 348 } 349 350 avatar := "" 351 if record.Avatar != nil { 352 avatar = record.Avatar.Ref.String() 353 } 354 355 description := "" 356 if record.Description != nil { 357 description = *record.Description 358 } 359 360 includeBluesky := record.Bluesky 361 362 pronouns := "" 363 if record.Pronouns != nil { 364 pronouns = *record.Pronouns 365 } 366 367 location := "" 368 if record.Location != nil { 369 location = *record.Location 370 } 371 372 var links [5]string 373 for i, l := range record.Links { 374 if i < 5 { 375 links[i] = l 376 } 377 } 378 379 var stats [2]models.VanityStat 380 for i, s := range record.Stats { 381 if i < 2 { 382 stats[i].Kind = models.ParseVanityStatKind(s) 383 } 384 } 385 386 var pinned [6]string 387 for i, r := range record.PinnedRepositories { 388 if i < 6 { 389 pinned[i] = r 390 } 391 } 392 393 var preferredHandle syntax.Handle 394 if record.PreferredHandle != nil { 395 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 396 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 397 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 398 preferredHandle = h 399 } 400 } 401 } 402 403 profile := models.Profile{ 404 Did: did, 405 Avatar: avatar, 406 Description: description, 407 IncludeBluesky: includeBluesky, 408 Location: location, 409 Links: links, 410 Stats: stats, 411 PinnedRepos: pinned, 412 Pronouns: pronouns, 413 PreferredHandle: preferredHandle, 414 } 415 416 ddb, ok := i.Db.Execer.(*db.DB) 417 if !ok { 418 return fmt.Errorf("failed to index profile record, invalid db cast") 419 } 420 421 tx, err := ddb.Begin() 422 if err != nil { 423 return fmt.Errorf("failed to start transaction") 424 } 425 426 err = db.ValidateProfile(tx, &profile) 427 if err != nil { 428 return fmt.Errorf("invalid profile record") 429 } 430 431 err = db.UpsertProfile(tx, &profile) 432 case jmodels.CommitOperationDelete: 433 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 434 } 435 436 if err != nil { 437 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 438 } 439 440 return nil 441} 442 443func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 444 did := e.Did 445 var err error 446 447 l := i.Logger.With("handler", "ingestSpindleMember") 448 l = l.With("nsid", e.Commit.Collection) 449 450 switch e.Commit.Operation { 451 case jmodels.CommitOperationCreate: 452 raw := json.RawMessage(e.Commit.Record) 453 record := tangled.SpindleMember{} 454 err = json.Unmarshal(raw, &record) 455 if err != nil { 456 l.Error("invalid record", "err", err) 457 return err 458 } 459 460 // only spindle owner can invite to spindles 461 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 462 if err != nil || !ok { 463 return fmt.Errorf("failed to enforce permissions: %w", err) 464 } 465 466 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 467 if err != nil { 468 return err 469 } 470 471 if memberId.Handle.IsInvalidHandle() { 472 return err 473 } 474 475 ddb, ok := i.Db.Execer.(*db.DB) 476 if !ok { 477 return fmt.Errorf("invalid db cast") 478 } 479 480 err = db.AddSpindleMember(ddb, models.SpindleMember{ 481 Did: syntax.DID(did), 482 Rkey: e.Commit.RKey, 483 Instance: record.Instance, 484 Subject: memberId.DID, 485 }) 486 if !ok { 487 return fmt.Errorf("failed to add to db: %w", err) 488 } 489 490 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 491 if err != nil { 492 return fmt.Errorf("failed to update ACLs: %w", err) 493 } 494 495 l.Info("added spindle member") 496 case jmodels.CommitOperationDelete: 497 rkey := e.Commit.RKey 498 499 ddb, ok := i.Db.Execer.(*db.DB) 500 if !ok { 501 return fmt.Errorf("failed to index profile record, invalid db cast") 502 } 503 504 // get record from db first 505 members, err := db.GetSpindleMembers( 506 ddb, 507 orm.FilterEq("did", did), 508 orm.FilterEq("rkey", rkey), 509 ) 510 if err != nil || len(members) != 1 { 511 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 512 } 513 member := members[0] 514 515 tx, err := ddb.Begin() 516 if err != nil { 517 return fmt.Errorf("failed to start txn: %w", err) 518 } 519 520 // remove record by rkey && update enforcer 521 if err = db.RemoveSpindleMember( 522 tx, 523 orm.FilterEq("did", did), 524 orm.FilterEq("rkey", rkey), 525 ); err != nil { 526 return fmt.Errorf("failed to remove from db: %w", err) 527 } 528 529 // update enforcer 530 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 531 if err != nil { 532 return fmt.Errorf("failed to update ACLs: %w", err) 533 } 534 535 if err = tx.Commit(); err != nil { 536 return fmt.Errorf("failed to commit txn: %w", err) 537 } 538 539 if err = i.Enforcer.E.SavePolicy(); err != nil { 540 return fmt.Errorf("failed to save ACLs: %w", err) 541 } 542 543 l.Info("removed spindle member") 544 } 545 546 return nil 547} 548 549func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 550 did := e.Did 551 var err error 552 553 l := i.Logger.With("handler", "ingestSpindle") 554 l = l.With("nsid", e.Commit.Collection) 555 556 switch e.Commit.Operation { 557 case jmodels.CommitOperationCreate: 558 raw := json.RawMessage(e.Commit.Record) 559 record := tangled.Spindle{} 560 err = json.Unmarshal(raw, &record) 561 if err != nil { 562 l.Error("invalid record", "err", err) 563 return err 564 } 565 566 instance := e.Commit.RKey 567 568 ddb, ok := i.Db.Execer.(*db.DB) 569 if !ok { 570 return fmt.Errorf("failed to index profile record, invalid db cast") 571 } 572 573 err := db.AddSpindle(ddb, models.Spindle{ 574 Owner: syntax.DID(did), 575 Instance: instance, 576 }) 577 if err != nil { 578 l.Error("failed to add spindle to db", "err", err, "instance", instance) 579 return err 580 } 581 582 err = retry.Do( 583 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 584 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 585 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 586 ) 587 if err != nil { 588 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 589 return err 590 } 591 592 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 593 if err != nil { 594 return fmt.Errorf("failed to mark verified: %w", err) 595 } 596 597 return nil 598 599 case jmodels.CommitOperationDelete: 600 instance := e.Commit.RKey 601 602 ddb, ok := i.Db.Execer.(*db.DB) 603 if !ok { 604 return fmt.Errorf("failed to index profile record, invalid db cast") 605 } 606 607 // get record from db first 608 spindles, err := db.GetSpindles( 609 ctx, 610 ddb, 611 orm.FilterEq("owner", did), 612 orm.FilterEq("instance", instance), 613 ) 614 if err != nil || len(spindles) != 1 { 615 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 616 } 617 spindle := spindles[0] 618 619 tx, err := ddb.Begin() 620 if err != nil { 621 return err 622 } 623 defer func() { 624 tx.Rollback() 625 i.Enforcer.E.LoadPolicy() 626 }() 627 628 // remove spindle members first 629 err = db.RemoveSpindleMember( 630 tx, 631 orm.FilterEq("owner", did), 632 orm.FilterEq("instance", instance), 633 ) 634 if err != nil { 635 return err 636 } 637 638 err = db.DeleteSpindle( 639 tx, 640 orm.FilterEq("owner", did), 641 orm.FilterEq("instance", instance), 642 ) 643 if err != nil { 644 return err 645 } 646 647 if spindle.Verified != nil { 648 err = i.Enforcer.RemoveSpindle(instance) 649 if err != nil { 650 return err 651 } 652 } 653 654 err = tx.Commit() 655 if err != nil { 656 return err 657 } 658 659 err = i.Enforcer.E.SavePolicy() 660 if err != nil { 661 return err 662 } 663 } 664 665 return nil 666} 667 668func (i *Ingester) ingestString(e *jmodels.Event) error { 669 did := e.Did 670 rkey := e.Commit.RKey 671 672 var err error 673 674 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 675 l.Info("ingesting record") 676 677 ddb, ok := i.Db.Execer.(*db.DB) 678 if !ok { 679 return fmt.Errorf("failed to index string record, invalid db cast") 680 } 681 682 switch e.Commit.Operation { 683 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 684 raw := json.RawMessage(e.Commit.Record) 685 record := tangled.String{} 686 err = json.Unmarshal(raw, &record) 687 if err != nil { 688 l.Error("invalid record", "err", err) 689 return err 690 } 691 692 string := models.StringFromRecord(did, rkey, record) 693 694 if err = i.Validator.ValidateString(&string); err != nil { 695 l.Error("invalid record", "err", err) 696 return err 697 } 698 699 if err = db.AddString(ddb, string); err != nil { 700 l.Error("failed to add string", "err", err) 701 return err 702 } 703 704 return nil 705 706 case jmodels.CommitOperationDelete: 707 if err := db.DeleteString( 708 ddb, 709 orm.FilterEq("did", did), 710 orm.FilterEq("rkey", rkey), 711 ); err != nil { 712 l.Error("failed to delete", "err", err) 713 return fmt.Errorf("failed to delete string record: %w", err) 714 } 715 716 return nil 717 } 718 719 return nil 720} 721 722func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 723 did := e.Did 724 var err error 725 726 l := i.Logger.With("handler", "ingestKnotMember") 727 l = l.With("nsid", e.Commit.Collection) 728 729 switch e.Commit.Operation { 730 case jmodels.CommitOperationCreate: 731 raw := json.RawMessage(e.Commit.Record) 732 record := tangled.KnotMember{} 733 err = json.Unmarshal(raw, &record) 734 if err != nil { 735 l.Error("invalid record", "err", err) 736 return err 737 } 738 739 // only knot owner can invite to knots 740 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 741 if err != nil || !ok { 742 return fmt.Errorf("failed to enforce permissions: %w", err) 743 } 744 745 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 746 if err != nil { 747 return err 748 } 749 750 if memberId.Handle.IsInvalidHandle() { 751 return err 752 } 753 754 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 755 if err != nil { 756 return fmt.Errorf("failed to update ACLs: %w", err) 757 } 758 759 l.Info("added knot member") 760 case jmodels.CommitOperationDelete: 761 // we don't store knot members in a table (like we do for spindle) 762 // and we can't remove this just yet. possibly fixed if we switch 763 // to either: 764 // 1. a knot_members table like with spindle and store the rkey 765 // 2. use the knot host as the rkey 766 // 767 // TODO: implement member deletion 768 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 769 } 770 771 return nil 772} 773 774func (i *Ingester) ingestKnot(e *jmodels.Event) error { 775 did := e.Did 776 var err error 777 778 l := i.Logger.With("handler", "ingestKnot") 779 l = l.With("nsid", e.Commit.Collection) 780 781 switch e.Commit.Operation { 782 case jmodels.CommitOperationCreate: 783 raw := json.RawMessage(e.Commit.Record) 784 record := tangled.Knot{} 785 err = json.Unmarshal(raw, &record) 786 if err != nil { 787 l.Error("invalid record", "err", err) 788 return err 789 } 790 791 domain := e.Commit.RKey 792 793 ddb, ok := i.Db.Execer.(*db.DB) 794 if !ok { 795 return fmt.Errorf("failed to index profile record, invalid db cast") 796 } 797 798 err := db.AddKnot(ddb, domain, did) 799 if err != nil { 800 l.Error("failed to add knot to db", "err", err, "domain", domain) 801 return err 802 } 803 804 err = retry.Do( 805 func() error { 806 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 807 }, 808 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 809 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 810 ) 811 if err != nil { 812 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 813 return err 814 } 815 816 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 817 if err != nil { 818 return fmt.Errorf("failed to mark verified: %w", err) 819 } 820 821 return nil 822 823 case jmodels.CommitOperationDelete: 824 domain := e.Commit.RKey 825 826 ddb, ok := i.Db.Execer.(*db.DB) 827 if !ok { 828 return fmt.Errorf("failed to index knot record, invalid db cast") 829 } 830 831 // get record from db first 832 registrations, err := db.GetRegistrations( 833 ddb, 834 orm.FilterEq("domain", domain), 835 orm.FilterEq("did", did), 836 ) 837 if err != nil { 838 return fmt.Errorf("failed to get registration: %w", err) 839 } 840 if len(registrations) != 1 { 841 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 842 } 843 registration := registrations[0] 844 845 tx, err := ddb.Begin() 846 if err != nil { 847 return err 848 } 849 defer func() { 850 tx.Rollback() 851 i.Enforcer.E.LoadPolicy() 852 }() 853 854 err = db.DeleteKnot( 855 tx, 856 orm.FilterEq("did", did), 857 orm.FilterEq("domain", domain), 858 ) 859 if err != nil { 860 return err 861 } 862 863 if registration.Registered != nil { 864 err = i.Enforcer.RemoveKnot(domain) 865 if err != nil { 866 return err 867 } 868 } 869 870 err = tx.Commit() 871 if err != nil { 872 return err 873 } 874 875 err = i.Enforcer.E.SavePolicy() 876 if err != nil { 877 return err 878 } 879 } 880 881 return nil 882} 883func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 884 did := e.Did 885 rkey := e.Commit.RKey 886 887 var err error 888 889 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 890 l.Info("ingesting record") 891 892 ddb, ok := i.Db.Execer.(*db.DB) 893 if !ok { 894 return fmt.Errorf("failed to index issue record, invalid db cast") 895 } 896 897 switch e.Commit.Operation { 898 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 899 raw := json.RawMessage(e.Commit.Record) 900 record := tangled.RepoIssue{} 901 err = json.Unmarshal(raw, &record) 902 if err != nil { 903 l.Error("invalid record", "err", err) 904 return err 905 } 906 907 issue := models.IssueFromRecord(did, rkey, record) 908 909 if issue.RepoAt == "" { 910 return fmt.Errorf("issue record has no repo field") 911 } 912 913 if err := i.Validator.ValidateIssue(&issue); err != nil { 914 return fmt.Errorf("failed to validate issue: %w", err) 915 } 916 917 if record.Repo != nil { 918 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 919 if repoErr == nil && repo.RepoDid != "" { 920 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 921 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 922 } 923 } 924 } 925 926 tx, err := ddb.BeginTx(ctx, nil) 927 if err != nil { 928 l.Error("failed to begin transaction", "err", err) 929 return err 930 } 931 defer tx.Rollback() 932 933 err = db.PutIssue(tx, &issue) 934 if err != nil { 935 l.Error("failed to create issue", "err", err) 936 return err 937 } 938 939 err = tx.Commit() 940 if err != nil { 941 l.Error("failed to commit txn", "err", err) 942 return err 943 } 944 945 return nil 946 947 case jmodels.CommitOperationDelete: 948 tx, err := ddb.BeginTx(ctx, nil) 949 if err != nil { 950 l.Error("failed to begin transaction", "err", err) 951 return err 952 } 953 defer tx.Rollback() 954 955 if err := db.DeleteIssues( 956 tx, 957 did, 958 rkey, 959 ); err != nil { 960 l.Error("failed to delete", "err", err) 961 return fmt.Errorf("failed to delete issue record: %w", err) 962 } 963 if err := tx.Commit(); err != nil { 964 l.Error("failed to commit txn", "err", err) 965 return err 966 } 967 968 return nil 969 } 970 971 return nil 972} 973 974func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 975 did := e.Did 976 rkey := e.Commit.RKey 977 978 var err error 979 980 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 981 l.Info("ingesting record") 982 983 ddb, ok := i.Db.Execer.(*db.DB) 984 if !ok { 985 return fmt.Errorf("failed to index pull record, invalid db cast") 986 } 987 988 switch e.Commit.Operation { 989 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 990 raw := json.RawMessage(e.Commit.Record) 991 record := tangled.RepoPull{} 992 err = json.Unmarshal(raw, &record) 993 if err != nil { 994 l.Error("invalid record", "err", err) 995 return err 996 } 997 998 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 999 if err != nil { 1000 l.Error("failed to resolve did") 1001 return err 1002 } 1003 1004 // go through and fetch all blobs in parallel 1005 readers := make([]*io.ReadCloser, len(record.Rounds)) 1006 var mu sync.Mutex 1007 1008 g, gctx := errgroup.WithContext(ctx) 1009 1010 for idx, b := range record.Rounds { 1011 g.Go(func() error { 1012 // for some reason, a blob is empty 1013 if b.PatchBlob == nil { 1014 return fmt.Errorf("missing patchBlob in round %d", idx) 1015 } 1016 1017 ownerPds := ownerId.PDSEndpoint() 1018 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1019 q := url.Query() 1020 q.Set("cid", b.PatchBlob.Ref.String()) 1021 q.Set("did", did) 1022 url.RawQuery = q.Encode() 1023 1024 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1025 if err != nil { 1026 l.Error("failed to create request") 1027 return err 1028 } 1029 req.Header.Set("Content-Type", "application/json") 1030 1031 resp, err := http.DefaultClient.Do(req) 1032 if err != nil { 1033 l.Error("failed to make request") 1034 return err 1035 } 1036 1037 mu.Lock() 1038 readers[idx] = &resp.Body 1039 mu.Unlock() 1040 1041 return nil 1042 }) 1043 } 1044 1045 if err := g.Wait(); err != nil { 1046 for _, r := range readers { 1047 if r != nil && *r != nil { 1048 (*r).Close() 1049 } 1050 } 1051 return err 1052 } 1053 1054 defer func() { 1055 for _, r := range readers { 1056 if r != nil && *r != nil { 1057 (*r).Close() 1058 } 1059 } 1060 }() 1061 1062 pull, err := models.PullFromRecord(did, rkey, record, readers) 1063 if err != nil { 1064 return fmt.Errorf("failed to parse pull from record: %w", err) 1065 } 1066 if err := i.Validator.ValidatePull(pull); err != nil { 1067 return fmt.Errorf("failed to validate pull: %w", err) 1068 } 1069 1070 tx, err := ddb.BeginTx(ctx, nil) 1071 if err != nil { 1072 l.Error("failed to begin transaction", "err", err) 1073 return err 1074 } 1075 defer tx.Rollback() 1076 1077 err = db.PutPull(tx, pull) 1078 if err != nil { 1079 l.Error("failed to create pull", "err", err) 1080 return err 1081 } 1082 1083 err = tx.Commit() 1084 if err != nil { 1085 l.Error("failed to commit txn", "err", err) 1086 return err 1087 } 1088 1089 return nil 1090 1091 case jmodels.CommitOperationDelete: 1092 tx, err := ddb.BeginTx(ctx, nil) 1093 if err != nil { 1094 l.Error("failed to begin transaction", "err", err) 1095 return err 1096 } 1097 defer tx.Rollback() 1098 1099 if err := db.AbandonPulls( 1100 tx, 1101 orm.FilterEq("owner_did", did), 1102 orm.FilterEq("rkey", rkey), 1103 ); err != nil { 1104 l.Error("failed to abandon", "err", err) 1105 return fmt.Errorf("failed to abandon pull record: %w", err) 1106 } 1107 if err := tx.Commit(); err != nil { 1108 l.Error("failed to commit txn", "err", err) 1109 return err 1110 } 1111 1112 return nil 1113 } 1114 1115 return nil 1116} 1117 1118// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1119func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1120 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1121 l.Info("ingesting record") 1122 1123 switch e.Commit.Operation { 1124 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1125 // no-op. sh.tangled.repo.issue.comment is deprecated 1126 1127 case jmodels.CommitOperationDelete: 1128 if err := db.PurgeComments( 1129 i.Db, 1130 orm.FilterEq("did", e.Did), 1131 orm.FilterEq("collection", e.Commit.Collection), 1132 orm.FilterEq("rkey", e.Commit.RKey), 1133 ); err != nil { 1134 return fmt.Errorf("failed to delete comment record: %w", err) 1135 } 1136 } 1137 1138 return nil 1139} 1140 1141// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1142func (i *Ingester) ingestPullComment(e *jmodels.Event) error { 1143 l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1144 l.Info("ingesting record") 1145 1146 switch e.Commit.Operation { 1147 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1148 // no-op. sh.tangled.repo.pull.comment is deprecated 1149 1150 case jmodels.CommitOperationDelete: 1151 if err := db.PurgeComments( 1152 i.Db, 1153 orm.FilterEq("did", e.Did), 1154 orm.FilterEq("collection", e.Commit.Collection), 1155 orm.FilterEq("rkey", e.Commit.RKey), 1156 ); err != nil { 1157 return fmt.Errorf("failed to delete comment record: %w", err) 1158 } 1159 } 1160 1161 return nil 1162} 1163 1164func (i *Ingester) ingestComment(e *jmodels.Event) error { 1165 did := e.Did 1166 rkey := e.Commit.RKey 1167 cid := e.Commit.CID 1168 1169 var err error 1170 1171 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1172 l.Info("ingesting record") 1173 1174 ddb, ok := i.Db.Execer.(*db.DB) 1175 if !ok { 1176 return fmt.Errorf("failed to index issue comment record, invalid db cast") 1177 } 1178 1179 ctx := context.Background() 1180 1181 switch e.Commit.Operation { 1182 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1183 raw := json.RawMessage(e.Commit.Record) 1184 record := tangled.FeedComment{} 1185 err = json.Unmarshal(raw, &record) 1186 if err != nil { 1187 return fmt.Errorf("invalid record: %w", err) 1188 } 1189 1190 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1191 if err != nil { 1192 return fmt.Errorf("failed to parse comment from record: %w", err) 1193 } 1194 1195 if err := comment.Validate(); err != nil { 1196 return fmt.Errorf("failed to validate comment: %w", err) 1197 } 1198 1199 var mentions []syntax.DID 1200 var references []syntax.ATURI 1201 if comment.Body.Original != nil { 1202 mentions, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1203 } 1204 1205 tx, err := ddb.Begin() 1206 if err != nil { 1207 return fmt.Errorf("failed to start transaction: %w", err) 1208 } 1209 defer tx.Rollback() 1210 1211 err = db.PutComment(tx, comment, references) 1212 if err != nil { 1213 return fmt.Errorf("failed to create comment: %w", err) 1214 } 1215 1216 if err := tx.Commit(); err != nil { 1217 return err 1218 } 1219 1220 if e.Commit.Operation == jmodels.CommitOperationCreate { 1221 i.Notifier.NewComment(ctx, comment, mentions) 1222 } 1223 1224 case jmodels.CommitOperationDelete: 1225 if err := db.DeleteComments( 1226 ddb, 1227 orm.FilterEq("did", did), 1228 orm.FilterEq("collection", e.Commit.Collection), 1229 orm.FilterEq("rkey", rkey), 1230 ); err != nil { 1231 return fmt.Errorf("failed to delete comment record: %w", err) 1232 } 1233 1234 return nil 1235 } 1236 1237 return nil 1238} 1239 1240func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1241 did := e.Did 1242 rkey := e.Commit.RKey 1243 1244 var err error 1245 1246 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1247 l.Info("ingesting record") 1248 1249 ddb, ok := i.Db.Execer.(*db.DB) 1250 if !ok { 1251 return fmt.Errorf("failed to index label definition, invalid db cast") 1252 } 1253 1254 switch e.Commit.Operation { 1255 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1256 raw := json.RawMessage(e.Commit.Record) 1257 record := tangled.LabelDefinition{} 1258 err = json.Unmarshal(raw, &record) 1259 if err != nil { 1260 return fmt.Errorf("invalid record: %w", err) 1261 } 1262 1263 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1264 if err != nil { 1265 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1266 } 1267 1268 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1269 return fmt.Errorf("failed to validate labeldef: %w", err) 1270 } 1271 1272 _, err = db.AddLabelDefinition(ddb, def) 1273 if err != nil { 1274 return fmt.Errorf("failed to create labeldef: %w", err) 1275 } 1276 1277 return nil 1278 1279 case jmodels.CommitOperationDelete: 1280 if err := db.DeleteLabelDefinition( 1281 ddb, 1282 orm.FilterEq("did", did), 1283 orm.FilterEq("rkey", rkey), 1284 ); err != nil { 1285 return fmt.Errorf("failed to delete labeldef record: %w", err) 1286 } 1287 1288 return nil 1289 } 1290 1291 return nil 1292} 1293 1294func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1295 did := e.Did 1296 rkey := e.Commit.RKey 1297 1298 var err error 1299 1300 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1301 l.Info("ingesting record") 1302 1303 ddb, ok := i.Db.Execer.(*db.DB) 1304 if !ok { 1305 return fmt.Errorf("failed to index label op, invalid db cast") 1306 } 1307 1308 switch e.Commit.Operation { 1309 case jmodels.CommitOperationCreate: 1310 raw := json.RawMessage(e.Commit.Record) 1311 record := tangled.LabelOp{} 1312 err = json.Unmarshal(raw, &record) 1313 if err != nil { 1314 return fmt.Errorf("invalid record: %w", err) 1315 } 1316 1317 subject := syntax.ATURI(record.Subject) 1318 collection := subject.Collection() 1319 1320 var repo *models.Repo 1321 switch collection { 1322 case tangled.RepoIssueNSID: 1323 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1324 if err != nil || len(i) != 1 { 1325 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1326 } 1327 repo = i[0].Repo 1328 default: 1329 return fmt.Errorf("unsupported label subject: %s", collection) 1330 } 1331 1332 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1333 if err != nil { 1334 return fmt.Errorf("failed to build label application ctx: %w", err) 1335 } 1336 1337 ops := models.LabelOpsFromRecord(did, rkey, record) 1338 1339 for _, o := range ops { 1340 def, ok := actx.Defs[o.OperandKey] 1341 if !ok { 1342 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1343 } 1344 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1345 return fmt.Errorf("failed to validate labelop: %w", err) 1346 } 1347 } 1348 1349 tx, err := ddb.Begin() 1350 if err != nil { 1351 return err 1352 } 1353 defer tx.Rollback() 1354 1355 for _, o := range ops { 1356 _, err = db.AddLabelOp(tx, &o) 1357 if err != nil { 1358 return fmt.Errorf("failed to add labelop: %w", err) 1359 } 1360 } 1361 1362 if err = tx.Commit(); err != nil { 1363 return err 1364 } 1365 } 1366 1367 return nil 1368}