Monorepo for Tangled tangled.org
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/appview/validator" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/orm" 25 "tangled.org/core/rbac" 26) 27 28type Ingester struct { 29 Db db.DbWrapper 30 Enforcer *rbac.Enforcer 31 IdResolver *idresolver.Resolver 32 Config *config.Config 33 Logger *slog.Logger 34 Validator *validator.Validator 35} 36 37type processFunc func(ctx context.Context, e *jmodels.Event) error 38 39func (i *Ingester) Ingest() processFunc { 40 return func(ctx context.Context, e *jmodels.Event) error { 41 var err error 42 defer func() { 43 eventTime := e.TimeUS 44 lastTimeUs := eventTime + 1 45 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 46 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 47 } 48 }() 49 50 l := i.Logger.With("kind", e.Kind) 51 switch e.Kind { 52 case jmodels.EventKindAccount: 53 if !e.Account.Active && *e.Account.Status == "deactivated" { 54 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 55 } 56 case jmodels.EventKindIdentity: 57 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 58 case jmodels.EventKindCommit: 59 switch e.Commit.Collection { 60 case tangled.GraphFollowNSID: 61 err = i.ingestFollow(e) 62 case tangled.FeedStarNSID: 63 err = i.ingestStar(e) 64 case tangled.PublicKeyNSID: 65 err = i.ingestPublicKey(e) 66 case tangled.RepoArtifactNSID: 67 err = i.ingestArtifact(e) 68 case tangled.ActorProfileNSID: 69 err = i.ingestProfile(e) 70 case tangled.SpindleMemberNSID: 71 err = i.ingestSpindleMember(ctx, e) 72 case tangled.SpindleNSID: 73 err = i.ingestSpindle(ctx, e) 74 case tangled.KnotMemberNSID: 75 err = i.ingestKnotMember(e) 76 case tangled.KnotNSID: 77 err = i.ingestKnot(e) 78 case tangled.StringNSID: 79 err = i.ingestString(e) 80 case tangled.RepoIssueNSID: 81 err = i.ingestIssue(ctx, e) 82 case tangled.RepoIssueCommentNSID: 83 err = i.ingestIssueComment(e) 84 case tangled.LabelDefinitionNSID: 85 err = i.ingestLabelDefinition(e) 86 case tangled.LabelOpNSID: 87 err = i.ingestLabelOp(e) 88 } 89 l = i.Logger.With("nsid", e.Commit.Collection) 90 } 91 92 if err != nil { 93 l.Warn("refused to ingest record", "err", err) 94 } 95 96 return nil 97 } 98} 99 100func (i *Ingester) ingestStar(e *jmodels.Event) error { 101 var err error 102 did := e.Did 103 104 l := i.Logger.With("handler", "ingestStar") 105 l = l.With("nsid", e.Commit.Collection) 106 107 switch e.Commit.Operation { 108 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 109 var subjectUri syntax.ATURI 110 111 raw := json.RawMessage(e.Commit.Record) 112 record := tangled.FeedStar{} 113 err := json.Unmarshal(raw, &record) 114 if err != nil { 115 l.Error("invalid record", "err", err) 116 return err 117 } 118 119 subjectUri, err = syntax.ParseATURI(record.Subject) 120 if err != nil { 121 l.Error("invalid record", "err", err) 122 return err 123 } 124 err = db.AddStar(i.Db, &models.Star{ 125 Did: did, 126 RepoAt: subjectUri, 127 Rkey: e.Commit.RKey, 128 }) 129 case jmodels.CommitOperationDelete: 130 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 131 } 132 133 if err != nil { 134 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 135 } 136 137 return nil 138} 139 140func (i *Ingester) ingestFollow(e *jmodels.Event) error { 141 var err error 142 did := e.Did 143 144 l := i.Logger.With("handler", "ingestFollow") 145 l = l.With("nsid", e.Commit.Collection) 146 147 switch e.Commit.Operation { 148 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 149 raw := json.RawMessage(e.Commit.Record) 150 record := tangled.GraphFollow{} 151 err = json.Unmarshal(raw, &record) 152 if err != nil { 153 l.Error("invalid record", "err", err) 154 return err 155 } 156 157 err = db.AddFollow(i.Db, &models.Follow{ 158 UserDid: did, 159 SubjectDid: record.Subject, 160 Rkey: e.Commit.RKey, 161 }) 162 case jmodels.CommitOperationDelete: 163 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 164 } 165 166 if err != nil { 167 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 168 } 169 170 return nil 171} 172 173func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 174 did := e.Did 175 var err error 176 177 l := i.Logger.With("handler", "ingestPublicKey") 178 l = l.With("nsid", e.Commit.Collection) 179 180 switch e.Commit.Operation { 181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 182 l.Debug("processing add of pubkey") 183 raw := json.RawMessage(e.Commit.Record) 184 record := tangled.PublicKey{} 185 err = json.Unmarshal(raw, &record) 186 if err != nil { 187 l.Error("invalid record", "err", err) 188 return err 189 } 190 191 name := record.Name 192 key := record.Key 193 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 194 case jmodels.CommitOperationDelete: 195 l.Debug("processing delete of pubkey") 196 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 197 } 198 199 if err != nil { 200 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 201 } 202 203 return nil 204} 205 206func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 207 did := e.Did 208 var err error 209 210 l := i.Logger.With("handler", "ingestArtifact") 211 l = l.With("nsid", e.Commit.Collection) 212 213 switch e.Commit.Operation { 214 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 215 raw := json.RawMessage(e.Commit.Record) 216 record := tangled.RepoArtifact{} 217 err = json.Unmarshal(raw, &record) 218 if err != nil { 219 l.Error("invalid record", "err", err) 220 return err 221 } 222 223 repoAt, err := syntax.ParseATURI(record.Repo) 224 if err != nil { 225 return err 226 } 227 228 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 229 if err != nil { 230 return err 231 } 232 233 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 234 if err != nil || !ok { 235 return err 236 } 237 238 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 239 if err != nil { 240 createdAt = time.Now() 241 } 242 243 artifact := models.Artifact{ 244 Did: did, 245 Rkey: e.Commit.RKey, 246 RepoAt: repoAt, 247 Tag: plumbing.Hash(record.Tag), 248 CreatedAt: createdAt, 249 BlobCid: cid.Cid(record.Artifact.Ref), 250 Name: record.Name, 251 Size: uint64(record.Artifact.Size), 252 MimeType: record.Artifact.MimeType, 253 } 254 255 err = db.AddArtifact(i.Db, artifact) 256 case jmodels.CommitOperationDelete: 257 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 258 } 259 260 if err != nil { 261 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 262 } 263 264 return nil 265} 266 267func (i *Ingester) ingestProfile(e *jmodels.Event) error { 268 did := e.Did 269 var err error 270 271 l := i.Logger.With("handler", "ingestProfile") 272 l = l.With("nsid", e.Commit.Collection) 273 274 if e.Commit.RKey != "self" { 275 return fmt.Errorf("ingestProfile only ingests `self` record") 276 } 277 278 switch e.Commit.Operation { 279 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 280 raw := json.RawMessage(e.Commit.Record) 281 record := tangled.ActorProfile{} 282 err = json.Unmarshal(raw, &record) 283 if err != nil { 284 l.Error("invalid record", "err", err) 285 return err 286 } 287 288 avatar := "" 289 if record.Avatar != nil { 290 avatar = record.Avatar.Ref.String() 291 } 292 293 description := "" 294 if record.Description != nil { 295 description = *record.Description 296 } 297 298 includeBluesky := record.Bluesky 299 300 pronouns := "" 301 if record.Pronouns != nil { 302 pronouns = *record.Pronouns 303 } 304 305 location := "" 306 if record.Location != nil { 307 location = *record.Location 308 } 309 310 var links [5]string 311 for i, l := range record.Links { 312 if i < 5 { 313 links[i] = l 314 } 315 } 316 317 var stats [2]models.VanityStat 318 for i, s := range record.Stats { 319 if i < 2 { 320 stats[i].Kind = models.VanityStatKind(s) 321 } 322 } 323 324 var pinned [6]syntax.ATURI 325 for i, r := range record.PinnedRepositories { 326 if i < 6 { 327 pinned[i] = syntax.ATURI(r) 328 } 329 } 330 331 profile := models.Profile{ 332 Did: did, 333 Avatar: avatar, 334 Description: description, 335 IncludeBluesky: includeBluesky, 336 Location: location, 337 Links: links, 338 Stats: stats, 339 PinnedRepos: pinned, 340 Pronouns: pronouns, 341 } 342 343 ddb, ok := i.Db.Execer.(*db.DB) 344 if !ok { 345 return fmt.Errorf("failed to index profile record, invalid db cast") 346 } 347 348 tx, err := ddb.Begin() 349 if err != nil { 350 return fmt.Errorf("failed to start transaction") 351 } 352 353 err = db.ValidateProfile(tx, &profile) 354 if err != nil { 355 return fmt.Errorf("invalid profile record") 356 } 357 358 err = db.UpsertProfile(tx, &profile) 359 case jmodels.CommitOperationDelete: 360 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 361 } 362 363 if err != nil { 364 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 365 } 366 367 return nil 368} 369 370func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 371 did := e.Did 372 var err error 373 374 l := i.Logger.With("handler", "ingestSpindleMember") 375 l = l.With("nsid", e.Commit.Collection) 376 377 switch e.Commit.Operation { 378 case jmodels.CommitOperationCreate: 379 raw := json.RawMessage(e.Commit.Record) 380 record := tangled.SpindleMember{} 381 err = json.Unmarshal(raw, &record) 382 if err != nil { 383 l.Error("invalid record", "err", err) 384 return err 385 } 386 387 // only spindle owner can invite to spindles 388 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 389 if err != nil || !ok { 390 return fmt.Errorf("failed to enforce permissions: %w", err) 391 } 392 393 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 394 if err != nil { 395 return err 396 } 397 398 if memberId.Handle.IsInvalidHandle() { 399 return err 400 } 401 402 ddb, ok := i.Db.Execer.(*db.DB) 403 if !ok { 404 return fmt.Errorf("failed to index profile record, invalid db cast") 405 } 406 407 err = db.AddSpindleMember(ddb, models.SpindleMember{ 408 Did: syntax.DID(did), 409 Rkey: e.Commit.RKey, 410 Instance: record.Instance, 411 Subject: memberId.DID, 412 }) 413 if !ok { 414 return fmt.Errorf("failed to add to db: %w", err) 415 } 416 417 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 418 if err != nil { 419 return fmt.Errorf("failed to update ACLs: %w", err) 420 } 421 422 l.Info("added spindle member") 423 case jmodels.CommitOperationDelete: 424 rkey := e.Commit.RKey 425 426 ddb, ok := i.Db.Execer.(*db.DB) 427 if !ok { 428 return fmt.Errorf("failed to index profile record, invalid db cast") 429 } 430 431 // get record from db first 432 members, err := db.GetSpindleMembers( 433 ddb, 434 orm.FilterEq("did", did), 435 orm.FilterEq("rkey", rkey), 436 ) 437 if err != nil || len(members) != 1 { 438 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 439 } 440 member := members[0] 441 442 tx, err := ddb.Begin() 443 if err != nil { 444 return fmt.Errorf("failed to start txn: %w", err) 445 } 446 447 // remove record by rkey && update enforcer 448 if err = db.RemoveSpindleMember( 449 tx, 450 orm.FilterEq("did", did), 451 orm.FilterEq("rkey", rkey), 452 ); err != nil { 453 return fmt.Errorf("failed to remove from db: %w", err) 454 } 455 456 // update enforcer 457 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 458 if err != nil { 459 return fmt.Errorf("failed to update ACLs: %w", err) 460 } 461 462 if err = tx.Commit(); err != nil { 463 return fmt.Errorf("failed to commit txn: %w", err) 464 } 465 466 if err = i.Enforcer.E.SavePolicy(); err != nil { 467 return fmt.Errorf("failed to save ACLs: %w", err) 468 } 469 470 l.Info("removed spindle member") 471 } 472 473 return nil 474} 475 476func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 477 did := e.Did 478 var err error 479 480 l := i.Logger.With("handler", "ingestSpindle") 481 l = l.With("nsid", e.Commit.Collection) 482 483 switch e.Commit.Operation { 484 case jmodels.CommitOperationCreate: 485 raw := json.RawMessage(e.Commit.Record) 486 record := tangled.Spindle{} 487 err = json.Unmarshal(raw, &record) 488 if err != nil { 489 l.Error("invalid record", "err", err) 490 return err 491 } 492 493 instance := e.Commit.RKey 494 495 ddb, ok := i.Db.Execer.(*db.DB) 496 if !ok { 497 return fmt.Errorf("failed to index profile record, invalid db cast") 498 } 499 500 err := db.AddSpindle(ddb, models.Spindle{ 501 Owner: syntax.DID(did), 502 Instance: instance, 503 }) 504 if err != nil { 505 l.Error("failed to add spindle to db", "err", err, "instance", instance) 506 return err 507 } 508 509 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 510 if err != nil { 511 l.Error("failed to add spindle to db", "err", err, "instance", instance) 512 return err 513 } 514 515 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 516 if err != nil { 517 return fmt.Errorf("failed to mark verified: %w", err) 518 } 519 520 return nil 521 522 case jmodels.CommitOperationDelete: 523 instance := e.Commit.RKey 524 525 ddb, ok := i.Db.Execer.(*db.DB) 526 if !ok { 527 return fmt.Errorf("failed to index profile record, invalid db cast") 528 } 529 530 // get record from db first 531 spindles, err := db.GetSpindles( 532 ddb, 533 orm.FilterEq("owner", did), 534 orm.FilterEq("instance", instance), 535 ) 536 if err != nil || len(spindles) != 1 { 537 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 538 } 539 spindle := spindles[0] 540 541 tx, err := ddb.Begin() 542 if err != nil { 543 return err 544 } 545 defer func() { 546 tx.Rollback() 547 i.Enforcer.E.LoadPolicy() 548 }() 549 550 // remove spindle members first 551 err = db.RemoveSpindleMember( 552 tx, 553 orm.FilterEq("owner", did), 554 orm.FilterEq("instance", instance), 555 ) 556 if err != nil { 557 return err 558 } 559 560 err = db.DeleteSpindle( 561 tx, 562 orm.FilterEq("owner", did), 563 orm.FilterEq("instance", instance), 564 ) 565 if err != nil { 566 return err 567 } 568 569 if spindle.Verified != nil { 570 err = i.Enforcer.RemoveSpindle(instance) 571 if err != nil { 572 return err 573 } 574 } 575 576 err = tx.Commit() 577 if err != nil { 578 return err 579 } 580 581 err = i.Enforcer.E.SavePolicy() 582 if err != nil { 583 return err 584 } 585 } 586 587 return nil 588} 589 590func (i *Ingester) ingestString(e *jmodels.Event) error { 591 did := e.Did 592 rkey := e.Commit.RKey 593 594 var err error 595 596 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 597 l.Info("ingesting record") 598 599 ddb, ok := i.Db.Execer.(*db.DB) 600 if !ok { 601 return fmt.Errorf("failed to index string record, invalid db cast") 602 } 603 604 switch e.Commit.Operation { 605 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 606 raw := json.RawMessage(e.Commit.Record) 607 record := tangled.String{} 608 err = json.Unmarshal(raw, &record) 609 if err != nil { 610 l.Error("invalid record", "err", err) 611 return err 612 } 613 614 string := models.StringFromRecord(did, rkey, record) 615 616 if err = i.Validator.ValidateString(&string); err != nil { 617 l.Error("invalid record", "err", err) 618 return err 619 } 620 621 if err = db.AddString(ddb, string); err != nil { 622 l.Error("failed to add string", "err", err) 623 return err 624 } 625 626 return nil 627 628 case jmodels.CommitOperationDelete: 629 if err := db.DeleteString( 630 ddb, 631 orm.FilterEq("did", did), 632 orm.FilterEq("rkey", rkey), 633 ); err != nil { 634 l.Error("failed to delete", "err", err) 635 return fmt.Errorf("failed to delete string record: %w", err) 636 } 637 638 return nil 639 } 640 641 return nil 642} 643 644func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 645 did := e.Did 646 var err error 647 648 l := i.Logger.With("handler", "ingestKnotMember") 649 l = l.With("nsid", e.Commit.Collection) 650 651 switch e.Commit.Operation { 652 case jmodels.CommitOperationCreate: 653 raw := json.RawMessage(e.Commit.Record) 654 record := tangled.KnotMember{} 655 err = json.Unmarshal(raw, &record) 656 if err != nil { 657 l.Error("invalid record", "err", err) 658 return err 659 } 660 661 // only knot owner can invite to knots 662 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 663 if err != nil || !ok { 664 return fmt.Errorf("failed to enforce permissions: %w", err) 665 } 666 667 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 668 if err != nil { 669 return err 670 } 671 672 if memberId.Handle.IsInvalidHandle() { 673 return err 674 } 675 676 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 677 if err != nil { 678 return fmt.Errorf("failed to update ACLs: %w", err) 679 } 680 681 l.Info("added knot member") 682 case jmodels.CommitOperationDelete: 683 // we don't store knot members in a table (like we do for spindle) 684 // and we can't remove this just yet. possibly fixed if we switch 685 // to either: 686 // 1. a knot_members table like with spindle and store the rkey 687 // 2. use the knot host as the rkey 688 // 689 // TODO: implement member deletion 690 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 691 } 692 693 return nil 694} 695 696func (i *Ingester) ingestKnot(e *jmodels.Event) error { 697 did := e.Did 698 var err error 699 700 l := i.Logger.With("handler", "ingestKnot") 701 l = l.With("nsid", e.Commit.Collection) 702 703 switch e.Commit.Operation { 704 case jmodels.CommitOperationCreate: 705 raw := json.RawMessage(e.Commit.Record) 706 record := tangled.Knot{} 707 err = json.Unmarshal(raw, &record) 708 if err != nil { 709 l.Error("invalid record", "err", err) 710 return err 711 } 712 713 domain := e.Commit.RKey 714 715 ddb, ok := i.Db.Execer.(*db.DB) 716 if !ok { 717 return fmt.Errorf("failed to index profile record, invalid db cast") 718 } 719 720 err := db.AddKnot(ddb, domain, did) 721 if err != nil { 722 l.Error("failed to add knot to db", "err", err, "domain", domain) 723 return err 724 } 725 726 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 727 if err != nil { 728 l.Error("failed to verify knot", "err", err, "domain", domain) 729 return err 730 } 731 732 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 733 if err != nil { 734 return fmt.Errorf("failed to mark verified: %w", err) 735 } 736 737 return nil 738 739 case jmodels.CommitOperationDelete: 740 domain := e.Commit.RKey 741 742 ddb, ok := i.Db.Execer.(*db.DB) 743 if !ok { 744 return fmt.Errorf("failed to index knot record, invalid db cast") 745 } 746 747 // get record from db first 748 registrations, err := db.GetRegistrations( 749 ddb, 750 orm.FilterEq("domain", domain), 751 orm.FilterEq("did", did), 752 ) 753 if err != nil { 754 return fmt.Errorf("failed to get registration: %w", err) 755 } 756 if len(registrations) != 1 { 757 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 758 } 759 registration := registrations[0] 760 761 tx, err := ddb.Begin() 762 if err != nil { 763 return err 764 } 765 defer func() { 766 tx.Rollback() 767 i.Enforcer.E.LoadPolicy() 768 }() 769 770 err = db.DeleteKnot( 771 tx, 772 orm.FilterEq("did", did), 773 orm.FilterEq("domain", domain), 774 ) 775 if err != nil { 776 return err 777 } 778 779 if registration.Registered != nil { 780 err = i.Enforcer.RemoveKnot(domain) 781 if err != nil { 782 return err 783 } 784 } 785 786 err = tx.Commit() 787 if err != nil { 788 return err 789 } 790 791 err = i.Enforcer.E.SavePolicy() 792 if err != nil { 793 return err 794 } 795 } 796 797 return nil 798} 799func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 800 did := e.Did 801 rkey := e.Commit.RKey 802 803 var err error 804 805 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 806 l.Info("ingesting record") 807 808 ddb, ok := i.Db.Execer.(*db.DB) 809 if !ok { 810 return fmt.Errorf("failed to index issue record, invalid db cast") 811 } 812 813 switch e.Commit.Operation { 814 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 815 raw := json.RawMessage(e.Commit.Record) 816 record := tangled.RepoIssue{} 817 err = json.Unmarshal(raw, &record) 818 if err != nil { 819 l.Error("invalid record", "err", err) 820 return err 821 } 822 823 issue := models.IssueFromRecord(did, rkey, record) 824 825 if err := i.Validator.ValidateIssue(&issue); err != nil { 826 return fmt.Errorf("failed to validate issue: %w", err) 827 } 828 829 tx, err := ddb.BeginTx(ctx, nil) 830 if err != nil { 831 l.Error("failed to begin transaction", "err", err) 832 return err 833 } 834 defer tx.Rollback() 835 836 err = db.PutIssue(tx, &issue) 837 if err != nil { 838 l.Error("failed to create issue", "err", err) 839 return err 840 } 841 842 err = tx.Commit() 843 if err != nil { 844 l.Error("failed to commit txn", "err", err) 845 return err 846 } 847 848 return nil 849 850 case jmodels.CommitOperationDelete: 851 tx, err := ddb.BeginTx(ctx, nil) 852 if err != nil { 853 l.Error("failed to begin transaction", "err", err) 854 return err 855 } 856 defer tx.Rollback() 857 858 if err := db.DeleteIssues( 859 tx, 860 did, 861 rkey, 862 ); err != nil { 863 l.Error("failed to delete", "err", err) 864 return fmt.Errorf("failed to delete issue record: %w", err) 865 } 866 if err := tx.Commit(); err != nil { 867 l.Error("failed to commit txn", "err", err) 868 return err 869 } 870 871 return nil 872 } 873 874 return nil 875} 876 877func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 878 did := e.Did 879 rkey := e.Commit.RKey 880 881 var err error 882 883 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 884 l.Info("ingesting record") 885 886 ddb, ok := i.Db.Execer.(*db.DB) 887 if !ok { 888 return fmt.Errorf("failed to index issue comment record, invalid db cast") 889 } 890 891 switch e.Commit.Operation { 892 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 893 raw := json.RawMessage(e.Commit.Record) 894 record := tangled.RepoIssueComment{} 895 err = json.Unmarshal(raw, &record) 896 if err != nil { 897 return fmt.Errorf("invalid record: %w", err) 898 } 899 900 comment, err := models.IssueCommentFromRecord(did, rkey, record) 901 if err != nil { 902 return fmt.Errorf("failed to parse comment from record: %w", err) 903 } 904 905 if err := i.Validator.ValidateIssueComment(comment); err != nil { 906 return fmt.Errorf("failed to validate comment: %w", err) 907 } 908 909 tx, err := ddb.Begin() 910 if err != nil { 911 return fmt.Errorf("failed to start transaction: %w", err) 912 } 913 defer tx.Rollback() 914 915 _, err = db.AddIssueComment(tx, *comment) 916 if err != nil { 917 return fmt.Errorf("failed to create issue comment: %w", err) 918 } 919 920 return tx.Commit() 921 922 case jmodels.CommitOperationDelete: 923 if err := db.DeleteIssueComments( 924 ddb, 925 orm.FilterEq("did", did), 926 orm.FilterEq("rkey", rkey), 927 ); err != nil { 928 return fmt.Errorf("failed to delete issue comment record: %w", err) 929 } 930 931 return nil 932 } 933 934 return nil 935} 936 937func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 938 did := e.Did 939 rkey := e.Commit.RKey 940 941 var err error 942 943 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 944 l.Info("ingesting record") 945 946 ddb, ok := i.Db.Execer.(*db.DB) 947 if !ok { 948 return fmt.Errorf("failed to index label definition, invalid db cast") 949 } 950 951 switch e.Commit.Operation { 952 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 953 raw := json.RawMessage(e.Commit.Record) 954 record := tangled.LabelDefinition{} 955 err = json.Unmarshal(raw, &record) 956 if err != nil { 957 return fmt.Errorf("invalid record: %w", err) 958 } 959 960 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 961 if err != nil { 962 return fmt.Errorf("failed to parse labeldef from record: %w", err) 963 } 964 965 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 966 return fmt.Errorf("failed to validate labeldef: %w", err) 967 } 968 969 _, err = db.AddLabelDefinition(ddb, def) 970 if err != nil { 971 return fmt.Errorf("failed to create labeldef: %w", err) 972 } 973 974 return nil 975 976 case jmodels.CommitOperationDelete: 977 if err := db.DeleteLabelDefinition( 978 ddb, 979 orm.FilterEq("did", did), 980 orm.FilterEq("rkey", rkey), 981 ); err != nil { 982 return fmt.Errorf("failed to delete labeldef record: %w", err) 983 } 984 985 return nil 986 } 987 988 return nil 989} 990 991func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 992 did := e.Did 993 rkey := e.Commit.RKey 994 995 var err error 996 997 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 998 l.Info("ingesting record") 999 1000 ddb, ok := i.Db.Execer.(*db.DB) 1001 if !ok { 1002 return fmt.Errorf("failed to index label op, invalid db cast") 1003 } 1004 1005 switch e.Commit.Operation { 1006 case jmodels.CommitOperationCreate: 1007 raw := json.RawMessage(e.Commit.Record) 1008 record := tangled.LabelOp{} 1009 err = json.Unmarshal(raw, &record) 1010 if err != nil { 1011 return fmt.Errorf("invalid record: %w", err) 1012 } 1013 1014 subject := syntax.ATURI(record.Subject) 1015 collection := subject.Collection() 1016 1017 var repo *models.Repo 1018 switch collection { 1019 case tangled.RepoIssueNSID: 1020 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1021 if err != nil || len(i) != 1 { 1022 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1023 } 1024 repo = i[0].Repo 1025 default: 1026 return fmt.Errorf("unsupport label subject: %s", collection) 1027 } 1028 1029 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1030 if err != nil { 1031 return fmt.Errorf("failed to build label application ctx: %w", err) 1032 } 1033 1034 ops := models.LabelOpsFromRecord(did, rkey, record) 1035 1036 for _, o := range ops { 1037 def, ok := actx.Defs[o.OperandKey] 1038 if !ok { 1039 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1040 } 1041 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1042 return fmt.Errorf("failed to validate labelop: %w", err) 1043 } 1044 } 1045 1046 tx, err := ddb.Begin() 1047 if err != nil { 1048 return err 1049 } 1050 defer tx.Rollback() 1051 1052 for _, o := range ops { 1053 _, err = db.AddLabelOp(tx, &o) 1054 if err != nil { 1055 return fmt.Errorf("failed to add labelop: %w", err) 1056 } 1057 } 1058 1059 if err = tx.Commit(); err != nil { 1060 return err 1061 } 1062 } 1063 1064 return nil 1065}