this repo has no description
at sl/shared-stacks 1145 lines 28 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/appview/validator" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/orm" 25 "tangled.org/core/rbac" 26) 27 28type Ingester struct { 29 Db db.DbWrapper 30 Enforcer *rbac.Enforcer 31 IdResolver *idresolver.Resolver 32 Config *config.Config 33 Logger *slog.Logger 34 Validator *validator.Validator 35} 36 37type processFunc func(ctx context.Context, e *jmodels.Event) error 38 39func (i *Ingester) Ingest() processFunc { 40 return func(ctx context.Context, e *jmodels.Event) error { 41 var err error 42 defer func() { 43 eventTime := e.TimeUS 44 lastTimeUs := eventTime + 1 45 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 46 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 47 } 48 }() 49 50 l := i.Logger.With("kind", e.Kind) 51 switch e.Kind { 52 case jmodels.EventKindAccount: 53 if !e.Account.Active && *e.Account.Status == "deactivated" { 54 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 55 } 56 case jmodels.EventKindIdentity: 57 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 58 case jmodels.EventKindCommit: 59 switch e.Commit.Collection { 60 case tangled.GraphFollowNSID: 61 err = i.ingestFollow(e) 62 case tangled.FeedStarNSID: 63 err = i.ingestStar(e) 64 case tangled.PublicKeyNSID: 65 err = i.ingestPublicKey(e) 66 case tangled.RepoArtifactNSID: 67 err = i.ingestArtifact(e) 68 case tangled.ActorProfileNSID: 69 err = i.ingestProfile(e) 70 case tangled.SpindleMemberNSID: 71 err = i.ingestSpindleMember(ctx, e) 72 case tangled.SpindleNSID: 73 err = i.ingestSpindle(ctx, e) 74 case tangled.KnotMemberNSID: 75 err = i.ingestKnotMember(e) 76 case tangled.KnotNSID: 77 err = i.ingestKnot(e) 78 case tangled.StringNSID: 79 err = i.ingestString(e) 80 case tangled.RepoIssueNSID: 81 err = i.ingestIssue(ctx, e) 82 case tangled.CommentNSID: 83 err = i.ingestComment(e) 84 case tangled.RepoIssueCommentNSID: 85 err = i.ingestIssueComment(e) 86 case tangled.LabelDefinitionNSID: 87 err = i.ingestLabelDefinition(e) 88 case tangled.LabelOpNSID: 89 err = i.ingestLabelOp(e) 90 } 91 l = i.Logger.With("nsid", e.Commit.Collection) 92 } 93 94 if err != nil { 95 l.Warn("refused to ingest record", "err", err) 96 } 97 98 return nil 99 } 100} 101 102func (i *Ingester) ingestStar(e *jmodels.Event) error { 103 var err error 104 did := e.Did 105 106 l := i.Logger.With("handler", "ingestStar") 107 l = l.With("nsid", e.Commit.Collection) 108 109 switch e.Commit.Operation { 110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 111 var subjectUri syntax.ATURI 112 113 raw := json.RawMessage(e.Commit.Record) 114 record := tangled.FeedStar{} 115 err := json.Unmarshal(raw, &record) 116 if err != nil { 117 l.Error("invalid record", "err", err) 118 return err 119 } 120 121 subjectUri, err = syntax.ParseATURI(record.Subject) 122 if err != nil { 123 l.Error("invalid record", "err", err) 124 return err 125 } 126 err = db.AddStar(i.Db, &models.Star{ 127 Did: did, 128 RepoAt: subjectUri, 129 Rkey: e.Commit.RKey, 130 }) 131 case jmodels.CommitOperationDelete: 132 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 133 } 134 135 if err != nil { 136 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 137 } 138 139 return nil 140} 141 142func (i *Ingester) ingestFollow(e *jmodels.Event) error { 143 var err error 144 did := e.Did 145 146 l := i.Logger.With("handler", "ingestFollow") 147 l = l.With("nsid", e.Commit.Collection) 148 149 switch e.Commit.Operation { 150 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 151 raw := json.RawMessage(e.Commit.Record) 152 record := tangled.GraphFollow{} 153 err = json.Unmarshal(raw, &record) 154 if err != nil { 155 l.Error("invalid record", "err", err) 156 return err 157 } 158 159 err = db.AddFollow(i.Db, &models.Follow{ 160 UserDid: did, 161 SubjectDid: record.Subject, 162 Rkey: e.Commit.RKey, 163 }) 164 case jmodels.CommitOperationDelete: 165 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 166 } 167 168 if err != nil { 169 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 170 } 171 172 return nil 173} 174 175func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 176 did := e.Did 177 var err error 178 179 l := i.Logger.With("handler", "ingestPublicKey") 180 l = l.With("nsid", e.Commit.Collection) 181 182 switch e.Commit.Operation { 183 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 184 l.Debug("processing add of pubkey") 185 raw := json.RawMessage(e.Commit.Record) 186 record := tangled.PublicKey{} 187 err = json.Unmarshal(raw, &record) 188 if err != nil { 189 l.Error("invalid record", "err", err) 190 return err 191 } 192 193 name := record.Name 194 key := record.Key 195 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 196 case jmodels.CommitOperationDelete: 197 l.Debug("processing delete of pubkey") 198 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 199 } 200 201 if err != nil { 202 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 203 } 204 205 return nil 206} 207 208func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 209 did := e.Did 210 var err error 211 212 l := i.Logger.With("handler", "ingestArtifact") 213 l = l.With("nsid", e.Commit.Collection) 214 215 switch e.Commit.Operation { 216 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 217 raw := json.RawMessage(e.Commit.Record) 218 record := tangled.RepoArtifact{} 219 err = json.Unmarshal(raw, &record) 220 if err != nil { 221 l.Error("invalid record", "err", err) 222 return err 223 } 224 225 repoAt, err := syntax.ParseATURI(record.Repo) 226 if err != nil { 227 return err 228 } 229 230 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 231 if err != nil { 232 return err 233 } 234 235 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 236 if err != nil || !ok { 237 return err 238 } 239 240 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 241 if err != nil { 242 createdAt = time.Now() 243 } 244 245 artifact := models.Artifact{ 246 Did: did, 247 Rkey: e.Commit.RKey, 248 RepoAt: repoAt, 249 Tag: plumbing.Hash(record.Tag), 250 CreatedAt: createdAt, 251 BlobCid: cid.Cid(record.Artifact.Ref), 252 Name: record.Name, 253 Size: uint64(record.Artifact.Size), 254 MimeType: record.Artifact.MimeType, 255 } 256 257 err = db.AddArtifact(i.Db, artifact) 258 case jmodels.CommitOperationDelete: 259 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 260 } 261 262 if err != nil { 263 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 264 } 265 266 return nil 267} 268 269func (i *Ingester) ingestProfile(e *jmodels.Event) error { 270 did := e.Did 271 var err error 272 273 l := i.Logger.With("handler", "ingestProfile") 274 l = l.With("nsid", e.Commit.Collection) 275 276 if e.Commit.RKey != "self" { 277 return fmt.Errorf("ingestProfile only ingests `self` record") 278 } 279 280 switch e.Commit.Operation { 281 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 282 raw := json.RawMessage(e.Commit.Record) 283 record := tangled.ActorProfile{} 284 err = json.Unmarshal(raw, &record) 285 if err != nil { 286 l.Error("invalid record", "err", err) 287 return err 288 } 289 290 avatar := "" 291 if record.Avatar != nil { 292 avatar = record.Avatar.Ref.String() 293 } 294 295 description := "" 296 if record.Description != nil { 297 description = *record.Description 298 } 299 300 includeBluesky := record.Bluesky 301 302 pronouns := "" 303 if record.Pronouns != nil { 304 pronouns = *record.Pronouns 305 } 306 307 location := "" 308 if record.Location != nil { 309 location = *record.Location 310 } 311 312 var links [5]string 313 for i, l := range record.Links { 314 if i < 5 { 315 links[i] = l 316 } 317 } 318 319 var stats [2]models.VanityStat 320 for i, s := range record.Stats { 321 if i < 2 { 322 stats[i].Kind = models.VanityStatKind(s) 323 } 324 } 325 326 var pinned [6]syntax.ATURI 327 for i, r := range record.PinnedRepositories { 328 if i < 6 { 329 pinned[i] = syntax.ATURI(r) 330 } 331 } 332 333 profile := models.Profile{ 334 Did: did, 335 Avatar: avatar, 336 Description: description, 337 IncludeBluesky: includeBluesky, 338 Location: location, 339 Links: links, 340 Stats: stats, 341 PinnedRepos: pinned, 342 Pronouns: pronouns, 343 } 344 345 ddb, ok := i.Db.Execer.(*db.DB) 346 if !ok { 347 return fmt.Errorf("failed to index profile record, invalid db cast") 348 } 349 350 tx, err := ddb.Begin() 351 if err != nil { 352 return fmt.Errorf("failed to start transaction") 353 } 354 355 err = db.ValidateProfile(tx, &profile) 356 if err != nil { 357 return fmt.Errorf("invalid profile record") 358 } 359 360 err = db.UpsertProfile(tx, &profile) 361 case jmodels.CommitOperationDelete: 362 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 363 } 364 365 if err != nil { 366 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 367 } 368 369 return nil 370} 371 372func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 373 did := e.Did 374 var err error 375 376 l := i.Logger.With("handler", "ingestSpindleMember") 377 l = l.With("nsid", e.Commit.Collection) 378 379 switch e.Commit.Operation { 380 case jmodels.CommitOperationCreate: 381 raw := json.RawMessage(e.Commit.Record) 382 record := tangled.SpindleMember{} 383 err = json.Unmarshal(raw, &record) 384 if err != nil { 385 l.Error("invalid record", "err", err) 386 return err 387 } 388 389 // only spindle owner can invite to spindles 390 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 391 if err != nil || !ok { 392 return fmt.Errorf("failed to enforce permissions: %w", err) 393 } 394 395 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 396 if err != nil { 397 return err 398 } 399 400 if memberId.Handle.IsInvalidHandle() { 401 return err 402 } 403 404 ddb, ok := i.Db.Execer.(*db.DB) 405 if !ok { 406 return fmt.Errorf("failed to index profile record, invalid db cast") 407 } 408 409 err = db.AddSpindleMember(ddb, models.SpindleMember{ 410 Did: syntax.DID(did), 411 Rkey: e.Commit.RKey, 412 Instance: record.Instance, 413 Subject: memberId.DID, 414 }) 415 if !ok { 416 return fmt.Errorf("failed to add to db: %w", err) 417 } 418 419 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 420 if err != nil { 421 return fmt.Errorf("failed to update ACLs: %w", err) 422 } 423 424 l.Info("added spindle member") 425 case jmodels.CommitOperationDelete: 426 rkey := e.Commit.RKey 427 428 ddb, ok := i.Db.Execer.(*db.DB) 429 if !ok { 430 return fmt.Errorf("failed to index profile record, invalid db cast") 431 } 432 433 // get record from db first 434 members, err := db.GetSpindleMembers( 435 ddb, 436 orm.FilterEq("did", did), 437 orm.FilterEq("rkey", rkey), 438 ) 439 if err != nil || len(members) != 1 { 440 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 441 } 442 member := members[0] 443 444 tx, err := ddb.Begin() 445 if err != nil { 446 return fmt.Errorf("failed to start txn: %w", err) 447 } 448 449 // remove record by rkey && update enforcer 450 if err = db.RemoveSpindleMember( 451 tx, 452 orm.FilterEq("did", did), 453 orm.FilterEq("rkey", rkey), 454 ); err != nil { 455 return fmt.Errorf("failed to remove from db: %w", err) 456 } 457 458 // update enforcer 459 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 460 if err != nil { 461 return fmt.Errorf("failed to update ACLs: %w", err) 462 } 463 464 if err = tx.Commit(); err != nil { 465 return fmt.Errorf("failed to commit txn: %w", err) 466 } 467 468 if err = i.Enforcer.E.SavePolicy(); err != nil { 469 return fmt.Errorf("failed to save ACLs: %w", err) 470 } 471 472 l.Info("removed spindle member") 473 } 474 475 return nil 476} 477 478func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 479 did := e.Did 480 var err error 481 482 l := i.Logger.With("handler", "ingestSpindle") 483 l = l.With("nsid", e.Commit.Collection) 484 485 switch e.Commit.Operation { 486 case jmodels.CommitOperationCreate: 487 raw := json.RawMessage(e.Commit.Record) 488 record := tangled.Spindle{} 489 err = json.Unmarshal(raw, &record) 490 if err != nil { 491 l.Error("invalid record", "err", err) 492 return err 493 } 494 495 instance := e.Commit.RKey 496 497 ddb, ok := i.Db.Execer.(*db.DB) 498 if !ok { 499 return fmt.Errorf("failed to index profile record, invalid db cast") 500 } 501 502 err := db.AddSpindle(ddb, models.Spindle{ 503 Owner: syntax.DID(did), 504 Instance: instance, 505 }) 506 if err != nil { 507 l.Error("failed to add spindle to db", "err", err, "instance", instance) 508 return err 509 } 510 511 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 512 if err != nil { 513 l.Error("failed to add spindle to db", "err", err, "instance", instance) 514 return err 515 } 516 517 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 518 if err != nil { 519 return fmt.Errorf("failed to mark verified: %w", err) 520 } 521 522 return nil 523 524 case jmodels.CommitOperationDelete: 525 instance := e.Commit.RKey 526 527 ddb, ok := i.Db.Execer.(*db.DB) 528 if !ok { 529 return fmt.Errorf("failed to index profile record, invalid db cast") 530 } 531 532 // get record from db first 533 spindles, err := db.GetSpindles( 534 ddb, 535 orm.FilterEq("owner", did), 536 orm.FilterEq("instance", instance), 537 ) 538 if err != nil || len(spindles) != 1 { 539 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 540 } 541 spindle := spindles[0] 542 543 tx, err := ddb.Begin() 544 if err != nil { 545 return err 546 } 547 defer func() { 548 tx.Rollback() 549 i.Enforcer.E.LoadPolicy() 550 }() 551 552 // remove spindle members first 553 err = db.RemoveSpindleMember( 554 tx, 555 orm.FilterEq("owner", did), 556 orm.FilterEq("instance", instance), 557 ) 558 if err != nil { 559 return err 560 } 561 562 err = db.DeleteSpindle( 563 tx, 564 orm.FilterEq("owner", did), 565 orm.FilterEq("instance", instance), 566 ) 567 if err != nil { 568 return err 569 } 570 571 if spindle.Verified != nil { 572 err = i.Enforcer.RemoveSpindle(instance) 573 if err != nil { 574 return err 575 } 576 } 577 578 err = tx.Commit() 579 if err != nil { 580 return err 581 } 582 583 err = i.Enforcer.E.SavePolicy() 584 if err != nil { 585 return err 586 } 587 } 588 589 return nil 590} 591 592func (i *Ingester) ingestString(e *jmodels.Event) error { 593 did := e.Did 594 rkey := e.Commit.RKey 595 596 var err error 597 598 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 599 l.Info("ingesting record") 600 601 ddb, ok := i.Db.Execer.(*db.DB) 602 if !ok { 603 return fmt.Errorf("failed to index string record, invalid db cast") 604 } 605 606 switch e.Commit.Operation { 607 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 608 raw := json.RawMessage(e.Commit.Record) 609 record := tangled.String{} 610 err = json.Unmarshal(raw, &record) 611 if err != nil { 612 l.Error("invalid record", "err", err) 613 return err 614 } 615 616 string := models.StringFromRecord(did, rkey, record) 617 618 if err = i.Validator.ValidateString(&string); err != nil { 619 l.Error("invalid record", "err", err) 620 return err 621 } 622 623 if err = db.AddString(ddb, string); err != nil { 624 l.Error("failed to add string", "err", err) 625 return err 626 } 627 628 return nil 629 630 case jmodels.CommitOperationDelete: 631 if err := db.DeleteString( 632 ddb, 633 orm.FilterEq("did", did), 634 orm.FilterEq("rkey", rkey), 635 ); err != nil { 636 l.Error("failed to delete", "err", err) 637 return fmt.Errorf("failed to delete string record: %w", err) 638 } 639 640 return nil 641 } 642 643 return nil 644} 645 646func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 647 did := e.Did 648 var err error 649 650 l := i.Logger.With("handler", "ingestKnotMember") 651 l = l.With("nsid", e.Commit.Collection) 652 653 switch e.Commit.Operation { 654 case jmodels.CommitOperationCreate: 655 raw := json.RawMessage(e.Commit.Record) 656 record := tangled.KnotMember{} 657 err = json.Unmarshal(raw, &record) 658 if err != nil { 659 l.Error("invalid record", "err", err) 660 return err 661 } 662 663 // only knot owner can invite to knots 664 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 665 if err != nil || !ok { 666 return fmt.Errorf("failed to enforce permissions: %w", err) 667 } 668 669 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 670 if err != nil { 671 return err 672 } 673 674 if memberId.Handle.IsInvalidHandle() { 675 return err 676 } 677 678 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 679 if err != nil { 680 return fmt.Errorf("failed to update ACLs: %w", err) 681 } 682 683 l.Info("added knot member") 684 case jmodels.CommitOperationDelete: 685 // we don't store knot members in a table (like we do for spindle) 686 // and we can't remove this just yet. possibly fixed if we switch 687 // to either: 688 // 1. a knot_members table like with spindle and store the rkey 689 // 2. use the knot host as the rkey 690 // 691 // TODO: implement member deletion 692 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 693 } 694 695 return nil 696} 697 698func (i *Ingester) ingestKnot(e *jmodels.Event) error { 699 did := e.Did 700 var err error 701 702 l := i.Logger.With("handler", "ingestKnot") 703 l = l.With("nsid", e.Commit.Collection) 704 705 switch e.Commit.Operation { 706 case jmodels.CommitOperationCreate: 707 raw := json.RawMessage(e.Commit.Record) 708 record := tangled.Knot{} 709 err = json.Unmarshal(raw, &record) 710 if err != nil { 711 l.Error("invalid record", "err", err) 712 return err 713 } 714 715 domain := e.Commit.RKey 716 717 ddb, ok := i.Db.Execer.(*db.DB) 718 if !ok { 719 return fmt.Errorf("failed to index profile record, invalid db cast") 720 } 721 722 err := db.AddKnot(ddb, domain, did) 723 if err != nil { 724 l.Error("failed to add knot to db", "err", err, "domain", domain) 725 return err 726 } 727 728 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 729 if err != nil { 730 l.Error("failed to verify knot", "err", err, "domain", domain) 731 return err 732 } 733 734 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 735 if err != nil { 736 return fmt.Errorf("failed to mark verified: %w", err) 737 } 738 739 return nil 740 741 case jmodels.CommitOperationDelete: 742 domain := e.Commit.RKey 743 744 ddb, ok := i.Db.Execer.(*db.DB) 745 if !ok { 746 return fmt.Errorf("failed to index knot record, invalid db cast") 747 } 748 749 // get record from db first 750 registrations, err := db.GetRegistrations( 751 ddb, 752 orm.FilterEq("domain", domain), 753 orm.FilterEq("did", did), 754 ) 755 if err != nil { 756 return fmt.Errorf("failed to get registration: %w", err) 757 } 758 if len(registrations) != 1 { 759 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 760 } 761 registration := registrations[0] 762 763 tx, err := ddb.Begin() 764 if err != nil { 765 return err 766 } 767 defer func() { 768 tx.Rollback() 769 i.Enforcer.E.LoadPolicy() 770 }() 771 772 err = db.DeleteKnot( 773 tx, 774 orm.FilterEq("did", did), 775 orm.FilterEq("domain", domain), 776 ) 777 if err != nil { 778 return err 779 } 780 781 if registration.Registered != nil { 782 err = i.Enforcer.RemoveKnot(domain) 783 if err != nil { 784 return err 785 } 786 } 787 788 err = tx.Commit() 789 if err != nil { 790 return err 791 } 792 793 err = i.Enforcer.E.SavePolicy() 794 if err != nil { 795 return err 796 } 797 } 798 799 return nil 800} 801func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 802 did := e.Did 803 rkey := e.Commit.RKey 804 805 var err error 806 807 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 808 l.Info("ingesting record") 809 810 ddb, ok := i.Db.Execer.(*db.DB) 811 if !ok { 812 return fmt.Errorf("failed to index issue record, invalid db cast") 813 } 814 815 switch e.Commit.Operation { 816 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 817 raw := json.RawMessage(e.Commit.Record) 818 record := tangled.RepoIssue{} 819 err = json.Unmarshal(raw, &record) 820 if err != nil { 821 l.Error("invalid record", "err", err) 822 return err 823 } 824 825 issue := models.IssueFromRecord(did, rkey, record) 826 827 if err := i.Validator.ValidateIssue(&issue); err != nil { 828 return fmt.Errorf("failed to validate issue: %w", err) 829 } 830 831 tx, err := ddb.BeginTx(ctx, nil) 832 if err != nil { 833 l.Error("failed to begin transaction", "err", err) 834 return err 835 } 836 defer tx.Rollback() 837 838 err = db.PutIssue(tx, &issue) 839 if err != nil { 840 l.Error("failed to create issue", "err", err) 841 return err 842 } 843 844 err = tx.Commit() 845 if err != nil { 846 l.Error("failed to commit txn", "err", err) 847 return err 848 } 849 850 return nil 851 852 case jmodels.CommitOperationDelete: 853 tx, err := ddb.BeginTx(ctx, nil) 854 if err != nil { 855 l.Error("failed to begin transaction", "err", err) 856 return err 857 } 858 defer tx.Rollback() 859 860 if err := db.DeleteIssues( 861 tx, 862 did, 863 rkey, 864 ); err != nil { 865 l.Error("failed to delete", "err", err) 866 return fmt.Errorf("failed to delete issue record: %w", err) 867 } 868 if err := tx.Commit(); err != nil { 869 l.Error("failed to commit txn", "err", err) 870 return err 871 } 872 873 return nil 874 } 875 876 return nil 877} 878 879func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 880 did := e.Did 881 rkey := e.Commit.RKey 882 883 var err error 884 885 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 886 l.Info("ingesting record") 887 888 ddb, ok := i.Db.Execer.(*db.DB) 889 if !ok { 890 return fmt.Errorf("failed to index issue comment record, invalid db cast") 891 } 892 893 switch e.Commit.Operation { 894 case jmodels.CommitOperationUpdate: 895 raw := json.RawMessage(e.Commit.Record) 896 record := tangled.RepoIssueComment{} 897 err = json.Unmarshal(raw, &record) 898 if err != nil { 899 return fmt.Errorf("invalid record: %w", err) 900 } 901 902 // convert 'sh.tangled.repo.issue.comment' to 'sh.tangled.comment' 903 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), tangled.Comment{ 904 Body: record.Body, 905 CreatedAt: record.CreatedAt, 906 Mentions: record.Mentions, 907 References: record.References, 908 ReplyTo: record.ReplyTo, 909 Subject: record.Issue, 910 }) 911 if err != nil { 912 return fmt.Errorf("failed to parse comment from record: %w", err) 913 } 914 915 if err := comment.Validate(); err != nil { 916 return fmt.Errorf("failed to validate comment: %w", err) 917 } 918 919 tx, err := ddb.Begin() 920 if err != nil { 921 return fmt.Errorf("failed to start transaction: %w", err) 922 } 923 defer tx.Rollback() 924 925 err = db.PutComment(tx, comment) 926 if err != nil { 927 return fmt.Errorf("failed to create comment: %w", err) 928 } 929 930 return tx.Commit() 931 932 case jmodels.CommitOperationDelete: 933 if err := db.DeleteComments( 934 ddb, 935 orm.FilterEq("did", did), 936 orm.FilterEq("collection", e.Commit.Collection), 937 orm.FilterEq("rkey", rkey), 938 ); err != nil { 939 return fmt.Errorf("failed to delete issue comment record: %w", err) 940 } 941 942 return nil 943 } 944 945 return nil 946} 947 948func (i *Ingester) ingestComment(e *jmodels.Event) error { 949 did := e.Did 950 rkey := e.Commit.RKey 951 952 var err error 953 954 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 955 l.Info("ingesting record") 956 957 ddb, ok := i.Db.Execer.(*db.DB) 958 if !ok { 959 return fmt.Errorf("failed to index issue comment record, invalid db cast") 960 } 961 962 switch e.Commit.Operation { 963 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 964 raw := json.RawMessage(e.Commit.Record) 965 record := tangled.Comment{} 966 err = json.Unmarshal(raw, &record) 967 if err != nil { 968 return fmt.Errorf("invalid record: %w", err) 969 } 970 971 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), record) 972 if err != nil { 973 return fmt.Errorf("failed to parse comment from record: %w", err) 974 } 975 976 // TODO: ingest pull comments 977 // we aren't ingesting pull comments yet because pull itself isn't fully atprotated. 978 // so we cannot know which round this comment is pointing to 979 if comment.Subject.Collection().String() == tangled.RepoPullNSID { 980 l.Info("skip ingesting pull comments") 981 return nil 982 } 983 984 if err := comment.Validate(); err != nil { 985 return fmt.Errorf("failed to validate comment: %w", err) 986 } 987 988 tx, err := ddb.Begin() 989 if err != nil { 990 return fmt.Errorf("failed to start transaction: %w", err) 991 } 992 defer tx.Rollback() 993 994 err = db.PutComment(tx, comment) 995 if err != nil { 996 return fmt.Errorf("failed to create comment: %w", err) 997 } 998 999 return tx.Commit() 1000 1001 case jmodels.CommitOperationDelete: 1002 if err := db.DeleteComments( 1003 ddb, 1004 orm.FilterEq("did", did), 1005 orm.FilterEq("collection", e.Commit.Collection), 1006 orm.FilterEq("rkey", rkey), 1007 ); err != nil { 1008 return fmt.Errorf("failed to delete comment record: %w", err) 1009 } 1010 1011 return nil 1012 } 1013 1014 return nil 1015} 1016 1017func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1018 did := e.Did 1019 rkey := e.Commit.RKey 1020 1021 var err error 1022 1023 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1024 l.Info("ingesting record") 1025 1026 ddb, ok := i.Db.Execer.(*db.DB) 1027 if !ok { 1028 return fmt.Errorf("failed to index label definition, invalid db cast") 1029 } 1030 1031 switch e.Commit.Operation { 1032 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1033 raw := json.RawMessage(e.Commit.Record) 1034 record := tangled.LabelDefinition{} 1035 err = json.Unmarshal(raw, &record) 1036 if err != nil { 1037 return fmt.Errorf("invalid record: %w", err) 1038 } 1039 1040 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1041 if err != nil { 1042 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1043 } 1044 1045 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1046 return fmt.Errorf("failed to validate labeldef: %w", err) 1047 } 1048 1049 _, err = db.AddLabelDefinition(ddb, def) 1050 if err != nil { 1051 return fmt.Errorf("failed to create labeldef: %w", err) 1052 } 1053 1054 return nil 1055 1056 case jmodels.CommitOperationDelete: 1057 if err := db.DeleteLabelDefinition( 1058 ddb, 1059 orm.FilterEq("did", did), 1060 orm.FilterEq("rkey", rkey), 1061 ); err != nil { 1062 return fmt.Errorf("failed to delete labeldef record: %w", err) 1063 } 1064 1065 return nil 1066 } 1067 1068 return nil 1069} 1070 1071func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1072 did := e.Did 1073 rkey := e.Commit.RKey 1074 1075 var err error 1076 1077 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1078 l.Info("ingesting record") 1079 1080 ddb, ok := i.Db.Execer.(*db.DB) 1081 if !ok { 1082 return fmt.Errorf("failed to index label op, invalid db cast") 1083 } 1084 1085 switch e.Commit.Operation { 1086 case jmodels.CommitOperationCreate: 1087 raw := json.RawMessage(e.Commit.Record) 1088 record := tangled.LabelOp{} 1089 err = json.Unmarshal(raw, &record) 1090 if err != nil { 1091 return fmt.Errorf("invalid record: %w", err) 1092 } 1093 1094 subject := syntax.ATURI(record.Subject) 1095 collection := subject.Collection() 1096 1097 var repo *models.Repo 1098 switch collection { 1099 case tangled.RepoIssueNSID: 1100 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1101 if err != nil || len(i) != 1 { 1102 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1103 } 1104 repo = i[0].Repo 1105 default: 1106 return fmt.Errorf("unsupport label subject: %s", collection) 1107 } 1108 1109 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1110 if err != nil { 1111 return fmt.Errorf("failed to build label application ctx: %w", err) 1112 } 1113 1114 ops := models.LabelOpsFromRecord(did, rkey, record) 1115 1116 for _, o := range ops { 1117 def, ok := actx.Defs[o.OperandKey] 1118 if !ok { 1119 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1120 } 1121 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1122 return fmt.Errorf("failed to validate labelop: %w", err) 1123 } 1124 } 1125 1126 tx, err := ddb.Begin() 1127 if err != nil { 1128 return err 1129 } 1130 defer tx.Rollback() 1131 1132 for _, o := range ops { 1133 _, err = db.AddLabelOp(tx, &o) 1134 if err != nil { 1135 return fmt.Errorf("failed to add labelop: %w", err) 1136 } 1137 } 1138 1139 if err = tx.Commit(); err != nil { 1140 return err 1141 } 1142 } 1143 1144 return nil 1145}