Monorepo for Tangled
at master 1077 lines 26 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/appview/validator" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/orm" 25 "tangled.org/core/rbac" 26) 27 28type Ingester struct { 29 Db db.DbWrapper 30 Enforcer *rbac.Enforcer 31 IdResolver *idresolver.Resolver 32 Config *config.Config 33 Logger *slog.Logger 34 Validator *validator.Validator 35} 36 37type processFunc func(ctx context.Context, e *jmodels.Event) error 38 39func (i *Ingester) Ingest() processFunc { 40 return func(ctx context.Context, e *jmodels.Event) error { 41 var err error 42 defer func() { 43 eventTime := e.TimeUS 44 lastTimeUs := eventTime + 1 45 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 46 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 47 } 48 }() 49 50 l := i.Logger.With("kind", e.Kind) 51 switch e.Kind { 52 case jmodels.EventKindAccount: 53 if !e.Account.Active && *e.Account.Status == "deactivated" { 54 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 55 } 56 case jmodels.EventKindIdentity: 57 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 58 case jmodels.EventKindCommit: 59 switch e.Commit.Collection { 60 case tangled.GraphFollowNSID: 61 err = i.ingestFollow(e) 62 case tangled.FeedStarNSID: 63 err = i.ingestStar(e) 64 case tangled.PublicKeyNSID: 65 err = i.ingestPublicKey(e) 66 case tangled.RepoArtifactNSID: 67 err = i.ingestArtifact(e) 68 case tangled.ActorProfileNSID: 69 err = i.ingestProfile(ctx, e) 70 case tangled.SpindleMemberNSID: 71 err = i.ingestSpindleMember(ctx, e) 72 case tangled.SpindleNSID: 73 err = i.ingestSpindle(ctx, e) 74 case tangled.KnotMemberNSID: 75 err = i.ingestKnotMember(e) 76 case tangled.KnotNSID: 77 err = i.ingestKnot(e) 78 case tangled.StringNSID: 79 err = i.ingestString(e) 80 case tangled.RepoIssueNSID: 81 err = i.ingestIssue(ctx, e) 82 case tangled.RepoIssueCommentNSID: 83 err = i.ingestIssueComment(e) 84 case tangled.LabelDefinitionNSID: 85 err = i.ingestLabelDefinition(e) 86 case tangled.LabelOpNSID: 87 err = i.ingestLabelOp(e) 88 } 89 l = i.Logger.With("nsid", e.Commit.Collection) 90 } 91 92 if err != nil { 93 l.Warn("refused to ingest record", "err", err) 94 } 95 96 return nil 97 } 98} 99 100func (i *Ingester) ingestStar(e *jmodels.Event) error { 101 var err error 102 did := e.Did 103 104 l := i.Logger.With("handler", "ingestStar") 105 l = l.With("nsid", e.Commit.Collection) 106 107 switch e.Commit.Operation { 108 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 109 var subjectUri syntax.ATURI 110 111 raw := json.RawMessage(e.Commit.Record) 112 record := tangled.FeedStar{} 113 err := json.Unmarshal(raw, &record) 114 if err != nil { 115 l.Error("invalid record", "err", err) 116 return err 117 } 118 119 subjectUri, err = syntax.ParseATURI(record.Subject) 120 if err != nil { 121 l.Error("invalid record", "err", err) 122 return err 123 } 124 err = db.AddStar(i.Db, &models.Star{ 125 Did: did, 126 RepoAt: subjectUri, 127 Rkey: e.Commit.RKey, 128 }) 129 case jmodels.CommitOperationDelete: 130 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 131 } 132 133 if err != nil { 134 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 135 } 136 137 return nil 138} 139 140func (i *Ingester) ingestFollow(e *jmodels.Event) error { 141 var err error 142 did := e.Did 143 144 l := i.Logger.With("handler", "ingestFollow") 145 l = l.With("nsid", e.Commit.Collection) 146 147 switch e.Commit.Operation { 148 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 149 raw := json.RawMessage(e.Commit.Record) 150 record := tangled.GraphFollow{} 151 err = json.Unmarshal(raw, &record) 152 if err != nil { 153 l.Error("invalid record", "err", err) 154 return err 155 } 156 157 err = db.AddFollow(i.Db, &models.Follow{ 158 UserDid: did, 159 SubjectDid: record.Subject, 160 Rkey: e.Commit.RKey, 161 }) 162 case jmodels.CommitOperationDelete: 163 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 164 } 165 166 if err != nil { 167 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 168 } 169 170 return nil 171} 172 173func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 174 did := e.Did 175 var err error 176 177 l := i.Logger.With("handler", "ingestPublicKey") 178 l = l.With("nsid", e.Commit.Collection) 179 180 switch e.Commit.Operation { 181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 182 l.Debug("processing add of pubkey") 183 raw := json.RawMessage(e.Commit.Record) 184 record := tangled.PublicKey{} 185 err = json.Unmarshal(raw, &record) 186 if err != nil { 187 l.Error("invalid record", "err", err) 188 return err 189 } 190 191 name := record.Name 192 key := record.Key 193 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 194 case jmodels.CommitOperationDelete: 195 l.Debug("processing delete of pubkey") 196 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 197 } 198 199 if err != nil { 200 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 201 } 202 203 return nil 204} 205 206func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 207 did := e.Did 208 var err error 209 210 l := i.Logger.With("handler", "ingestArtifact") 211 l = l.With("nsid", e.Commit.Collection) 212 213 switch e.Commit.Operation { 214 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 215 raw := json.RawMessage(e.Commit.Record) 216 record := tangled.RepoArtifact{} 217 err = json.Unmarshal(raw, &record) 218 if err != nil { 219 l.Error("invalid record", "err", err) 220 return err 221 } 222 223 repoAt, err := syntax.ParseATURI(record.Repo) 224 if err != nil { 225 return err 226 } 227 228 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 229 if err != nil { 230 return err 231 } 232 233 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 234 if err != nil || !ok { 235 return err 236 } 237 238 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 239 if err != nil { 240 createdAt = time.Now() 241 } 242 243 artifact := models.Artifact{ 244 Did: did, 245 Rkey: e.Commit.RKey, 246 RepoAt: repoAt, 247 Tag: plumbing.Hash(record.Tag), 248 CreatedAt: createdAt, 249 BlobCid: cid.Cid(record.Artifact.Ref), 250 Name: record.Name, 251 Size: uint64(record.Artifact.Size), 252 MimeType: record.Artifact.MimeType, 253 } 254 255 err = db.AddArtifact(i.Db, artifact) 256 case jmodels.CommitOperationDelete: 257 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 258 } 259 260 if err != nil { 261 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 262 } 263 264 return nil 265} 266 267func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 268 did := e.Did 269 var err error 270 271 l := i.Logger.With("handler", "ingestProfile") 272 l = l.With("nsid", e.Commit.Collection) 273 274 if e.Commit.RKey != "self" { 275 return fmt.Errorf("ingestProfile only ingests `self` record") 276 } 277 278 switch e.Commit.Operation { 279 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 280 raw := json.RawMessage(e.Commit.Record) 281 record := tangled.ActorProfile{} 282 err = json.Unmarshal(raw, &record) 283 if err != nil { 284 l.Error("invalid record", "err", err) 285 return err 286 } 287 288 avatar := "" 289 if record.Avatar != nil { 290 avatar = record.Avatar.Ref.String() 291 } 292 293 description := "" 294 if record.Description != nil { 295 description = *record.Description 296 } 297 298 includeBluesky := record.Bluesky 299 300 pronouns := "" 301 if record.Pronouns != nil { 302 pronouns = *record.Pronouns 303 } 304 305 location := "" 306 if record.Location != nil { 307 location = *record.Location 308 } 309 310 var links [5]string 311 for i, l := range record.Links { 312 if i < 5 { 313 links[i] = l 314 } 315 } 316 317 var stats [2]models.VanityStat 318 for i, s := range record.Stats { 319 if i < 2 { 320 stats[i].Kind = models.ParseVanityStatKind(s) 321 } 322 } 323 324 var pinned [6]syntax.ATURI 325 for i, r := range record.PinnedRepositories { 326 if i < 6 { 327 pinned[i] = syntax.ATURI(r) 328 } 329 } 330 331 var preferredHandle syntax.Handle 332 if record.PreferredHandle != nil { 333 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 334 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 335 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 336 preferredHandle = h 337 } 338 } 339 } 340 341 profile := models.Profile{ 342 Did: did, 343 Avatar: avatar, 344 Description: description, 345 IncludeBluesky: includeBluesky, 346 Location: location, 347 Links: links, 348 Stats: stats, 349 PinnedRepos: pinned, 350 Pronouns: pronouns, 351 PreferredHandle: preferredHandle, 352 } 353 354 ddb, ok := i.Db.Execer.(*db.DB) 355 if !ok { 356 return fmt.Errorf("failed to index profile record, invalid db cast") 357 } 358 359 tx, err := ddb.Begin() 360 if err != nil { 361 return fmt.Errorf("failed to start transaction") 362 } 363 364 err = db.ValidateProfile(tx, &profile) 365 if err != nil { 366 return fmt.Errorf("invalid profile record") 367 } 368 369 err = db.UpsertProfile(tx, &profile) 370 case jmodels.CommitOperationDelete: 371 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 372 } 373 374 if err != nil { 375 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 376 } 377 378 return nil 379} 380 381func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 382 did := e.Did 383 var err error 384 385 l := i.Logger.With("handler", "ingestSpindleMember") 386 l = l.With("nsid", e.Commit.Collection) 387 388 switch e.Commit.Operation { 389 case jmodels.CommitOperationCreate: 390 raw := json.RawMessage(e.Commit.Record) 391 record := tangled.SpindleMember{} 392 err = json.Unmarshal(raw, &record) 393 if err != nil { 394 l.Error("invalid record", "err", err) 395 return err 396 } 397 398 // only spindle owner can invite to spindles 399 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 400 if err != nil || !ok { 401 return fmt.Errorf("failed to enforce permissions: %w", err) 402 } 403 404 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 405 if err != nil { 406 return err 407 } 408 409 if memberId.Handle.IsInvalidHandle() { 410 return err 411 } 412 413 ddb, ok := i.Db.Execer.(*db.DB) 414 if !ok { 415 return fmt.Errorf("invalid db cast") 416 } 417 418 err = db.AddSpindleMember(ddb, models.SpindleMember{ 419 Did: syntax.DID(did), 420 Rkey: e.Commit.RKey, 421 Instance: record.Instance, 422 Subject: memberId.DID, 423 }) 424 if !ok { 425 return fmt.Errorf("failed to add to db: %w", err) 426 } 427 428 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 429 if err != nil { 430 return fmt.Errorf("failed to update ACLs: %w", err) 431 } 432 433 l.Info("added spindle member") 434 case jmodels.CommitOperationDelete: 435 rkey := e.Commit.RKey 436 437 ddb, ok := i.Db.Execer.(*db.DB) 438 if !ok { 439 return fmt.Errorf("failed to index profile record, invalid db cast") 440 } 441 442 // get record from db first 443 members, err := db.GetSpindleMembers( 444 ddb, 445 orm.FilterEq("did", did), 446 orm.FilterEq("rkey", rkey), 447 ) 448 if err != nil || len(members) != 1 { 449 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 450 } 451 member := members[0] 452 453 tx, err := ddb.Begin() 454 if err != nil { 455 return fmt.Errorf("failed to start txn: %w", err) 456 } 457 458 // remove record by rkey && update enforcer 459 if err = db.RemoveSpindleMember( 460 tx, 461 orm.FilterEq("did", did), 462 orm.FilterEq("rkey", rkey), 463 ); err != nil { 464 return fmt.Errorf("failed to remove from db: %w", err) 465 } 466 467 // update enforcer 468 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 469 if err != nil { 470 return fmt.Errorf("failed to update ACLs: %w", err) 471 } 472 473 if err = tx.Commit(); err != nil { 474 return fmt.Errorf("failed to commit txn: %w", err) 475 } 476 477 if err = i.Enforcer.E.SavePolicy(); err != nil { 478 return fmt.Errorf("failed to save ACLs: %w", err) 479 } 480 481 l.Info("removed spindle member") 482 } 483 484 return nil 485} 486 487func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 488 did := e.Did 489 var err error 490 491 l := i.Logger.With("handler", "ingestSpindle") 492 l = l.With("nsid", e.Commit.Collection) 493 494 switch e.Commit.Operation { 495 case jmodels.CommitOperationCreate: 496 raw := json.RawMessage(e.Commit.Record) 497 record := tangled.Spindle{} 498 err = json.Unmarshal(raw, &record) 499 if err != nil { 500 l.Error("invalid record", "err", err) 501 return err 502 } 503 504 instance := e.Commit.RKey 505 506 ddb, ok := i.Db.Execer.(*db.DB) 507 if !ok { 508 return fmt.Errorf("failed to index profile record, invalid db cast") 509 } 510 511 err := db.AddSpindle(ddb, models.Spindle{ 512 Owner: syntax.DID(did), 513 Instance: instance, 514 }) 515 if err != nil { 516 l.Error("failed to add spindle to db", "err", err, "instance", instance) 517 return err 518 } 519 520 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 521 if err != nil { 522 l.Error("failed to add spindle to db", "err", err, "instance", instance) 523 return err 524 } 525 526 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 527 if err != nil { 528 return fmt.Errorf("failed to mark verified: %w", err) 529 } 530 531 return nil 532 533 case jmodels.CommitOperationDelete: 534 instance := e.Commit.RKey 535 536 ddb, ok := i.Db.Execer.(*db.DB) 537 if !ok { 538 return fmt.Errorf("failed to index profile record, invalid db cast") 539 } 540 541 // get record from db first 542 spindles, err := db.GetSpindles( 543 ctx, 544 ddb, 545 orm.FilterEq("owner", did), 546 orm.FilterEq("instance", instance), 547 ) 548 if err != nil || len(spindles) != 1 { 549 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 550 } 551 spindle := spindles[0] 552 553 tx, err := ddb.Begin() 554 if err != nil { 555 return err 556 } 557 defer func() { 558 tx.Rollback() 559 i.Enforcer.E.LoadPolicy() 560 }() 561 562 // remove spindle members first 563 err = db.RemoveSpindleMember( 564 tx, 565 orm.FilterEq("owner", did), 566 orm.FilterEq("instance", instance), 567 ) 568 if err != nil { 569 return err 570 } 571 572 err = db.DeleteSpindle( 573 tx, 574 orm.FilterEq("owner", did), 575 orm.FilterEq("instance", instance), 576 ) 577 if err != nil { 578 return err 579 } 580 581 if spindle.Verified != nil { 582 err = i.Enforcer.RemoveSpindle(instance) 583 if err != nil { 584 return err 585 } 586 } 587 588 err = tx.Commit() 589 if err != nil { 590 return err 591 } 592 593 err = i.Enforcer.E.SavePolicy() 594 if err != nil { 595 return err 596 } 597 } 598 599 return nil 600} 601 602func (i *Ingester) ingestString(e *jmodels.Event) error { 603 did := e.Did 604 rkey := e.Commit.RKey 605 606 var err error 607 608 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 609 l.Info("ingesting record") 610 611 ddb, ok := i.Db.Execer.(*db.DB) 612 if !ok { 613 return fmt.Errorf("failed to index string record, invalid db cast") 614 } 615 616 switch e.Commit.Operation { 617 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 618 raw := json.RawMessage(e.Commit.Record) 619 record := tangled.String{} 620 err = json.Unmarshal(raw, &record) 621 if err != nil { 622 l.Error("invalid record", "err", err) 623 return err 624 } 625 626 string := models.StringFromRecord(did, rkey, record) 627 628 if err = i.Validator.ValidateString(&string); err != nil { 629 l.Error("invalid record", "err", err) 630 return err 631 } 632 633 if err = db.AddString(ddb, string); err != nil { 634 l.Error("failed to add string", "err", err) 635 return err 636 } 637 638 return nil 639 640 case jmodels.CommitOperationDelete: 641 if err := db.DeleteString( 642 ddb, 643 orm.FilterEq("did", did), 644 orm.FilterEq("rkey", rkey), 645 ); err != nil { 646 l.Error("failed to delete", "err", err) 647 return fmt.Errorf("failed to delete string record: %w", err) 648 } 649 650 return nil 651 } 652 653 return nil 654} 655 656func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 657 did := e.Did 658 var err error 659 660 l := i.Logger.With("handler", "ingestKnotMember") 661 l = l.With("nsid", e.Commit.Collection) 662 663 switch e.Commit.Operation { 664 case jmodels.CommitOperationCreate: 665 raw := json.RawMessage(e.Commit.Record) 666 record := tangled.KnotMember{} 667 err = json.Unmarshal(raw, &record) 668 if err != nil { 669 l.Error("invalid record", "err", err) 670 return err 671 } 672 673 // only knot owner can invite to knots 674 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 675 if err != nil || !ok { 676 return fmt.Errorf("failed to enforce permissions: %w", err) 677 } 678 679 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 680 if err != nil { 681 return err 682 } 683 684 if memberId.Handle.IsInvalidHandle() { 685 return err 686 } 687 688 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 689 if err != nil { 690 return fmt.Errorf("failed to update ACLs: %w", err) 691 } 692 693 l.Info("added knot member") 694 case jmodels.CommitOperationDelete: 695 // we don't store knot members in a table (like we do for spindle) 696 // and we can't remove this just yet. possibly fixed if we switch 697 // to either: 698 // 1. a knot_members table like with spindle and store the rkey 699 // 2. use the knot host as the rkey 700 // 701 // TODO: implement member deletion 702 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 703 } 704 705 return nil 706} 707 708func (i *Ingester) ingestKnot(e *jmodels.Event) error { 709 did := e.Did 710 var err error 711 712 l := i.Logger.With("handler", "ingestKnot") 713 l = l.With("nsid", e.Commit.Collection) 714 715 switch e.Commit.Operation { 716 case jmodels.CommitOperationCreate: 717 raw := json.RawMessage(e.Commit.Record) 718 record := tangled.Knot{} 719 err = json.Unmarshal(raw, &record) 720 if err != nil { 721 l.Error("invalid record", "err", err) 722 return err 723 } 724 725 domain := e.Commit.RKey 726 727 ddb, ok := i.Db.Execer.(*db.DB) 728 if !ok { 729 return fmt.Errorf("failed to index profile record, invalid db cast") 730 } 731 732 err := db.AddKnot(ddb, domain, did) 733 if err != nil { 734 l.Error("failed to add knot to db", "err", err, "domain", domain) 735 return err 736 } 737 738 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 739 if err != nil { 740 l.Error("failed to verify knot", "err", err, "domain", domain) 741 return err 742 } 743 744 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 745 if err != nil { 746 return fmt.Errorf("failed to mark verified: %w", err) 747 } 748 749 return nil 750 751 case jmodels.CommitOperationDelete: 752 domain := e.Commit.RKey 753 754 ddb, ok := i.Db.Execer.(*db.DB) 755 if !ok { 756 return fmt.Errorf("failed to index knot record, invalid db cast") 757 } 758 759 // get record from db first 760 registrations, err := db.GetRegistrations( 761 ddb, 762 orm.FilterEq("domain", domain), 763 orm.FilterEq("did", did), 764 ) 765 if err != nil { 766 return fmt.Errorf("failed to get registration: %w", err) 767 } 768 if len(registrations) != 1 { 769 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 770 } 771 registration := registrations[0] 772 773 tx, err := ddb.Begin() 774 if err != nil { 775 return err 776 } 777 defer func() { 778 tx.Rollback() 779 i.Enforcer.E.LoadPolicy() 780 }() 781 782 err = db.DeleteKnot( 783 tx, 784 orm.FilterEq("did", did), 785 orm.FilterEq("domain", domain), 786 ) 787 if err != nil { 788 return err 789 } 790 791 if registration.Registered != nil { 792 err = i.Enforcer.RemoveKnot(domain) 793 if err != nil { 794 return err 795 } 796 } 797 798 err = tx.Commit() 799 if err != nil { 800 return err 801 } 802 803 err = i.Enforcer.E.SavePolicy() 804 if err != nil { 805 return err 806 } 807 } 808 809 return nil 810} 811func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 812 did := e.Did 813 rkey := e.Commit.RKey 814 815 var err error 816 817 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 818 l.Info("ingesting record") 819 820 ddb, ok := i.Db.Execer.(*db.DB) 821 if !ok { 822 return fmt.Errorf("failed to index issue record, invalid db cast") 823 } 824 825 switch e.Commit.Operation { 826 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 827 raw := json.RawMessage(e.Commit.Record) 828 record := tangled.RepoIssue{} 829 err = json.Unmarshal(raw, &record) 830 if err != nil { 831 l.Error("invalid record", "err", err) 832 return err 833 } 834 835 issue := models.IssueFromRecord(did, rkey, record) 836 837 if err := i.Validator.ValidateIssue(&issue); err != nil { 838 return fmt.Errorf("failed to validate issue: %w", err) 839 } 840 841 tx, err := ddb.BeginTx(ctx, nil) 842 if err != nil { 843 l.Error("failed to begin transaction", "err", err) 844 return err 845 } 846 defer tx.Rollback() 847 848 err = db.PutIssue(tx, &issue) 849 if err != nil { 850 l.Error("failed to create issue", "err", err) 851 return err 852 } 853 854 err = tx.Commit() 855 if err != nil { 856 l.Error("failed to commit txn", "err", err) 857 return err 858 } 859 860 return nil 861 862 case jmodels.CommitOperationDelete: 863 tx, err := ddb.BeginTx(ctx, nil) 864 if err != nil { 865 l.Error("failed to begin transaction", "err", err) 866 return err 867 } 868 defer tx.Rollback() 869 870 if err := db.DeleteIssues( 871 tx, 872 did, 873 rkey, 874 ); err != nil { 875 l.Error("failed to delete", "err", err) 876 return fmt.Errorf("failed to delete issue record: %w", err) 877 } 878 if err := tx.Commit(); err != nil { 879 l.Error("failed to commit txn", "err", err) 880 return err 881 } 882 883 return nil 884 } 885 886 return nil 887} 888 889func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 890 did := e.Did 891 rkey := e.Commit.RKey 892 893 var err error 894 895 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 896 l.Info("ingesting record") 897 898 ddb, ok := i.Db.Execer.(*db.DB) 899 if !ok { 900 return fmt.Errorf("failed to index issue comment record, invalid db cast") 901 } 902 903 switch e.Commit.Operation { 904 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 905 raw := json.RawMessage(e.Commit.Record) 906 record := tangled.RepoIssueComment{} 907 err = json.Unmarshal(raw, &record) 908 if err != nil { 909 return fmt.Errorf("invalid record: %w", err) 910 } 911 912 comment, err := models.IssueCommentFromRecord(did, rkey, record) 913 if err != nil { 914 return fmt.Errorf("failed to parse comment from record: %w", err) 915 } 916 917 if err := i.Validator.ValidateIssueComment(comment); err != nil { 918 return fmt.Errorf("failed to validate comment: %w", err) 919 } 920 921 tx, err := ddb.Begin() 922 if err != nil { 923 return fmt.Errorf("failed to start transaction: %w", err) 924 } 925 defer tx.Rollback() 926 927 _, err = db.AddIssueComment(tx, *comment) 928 if err != nil { 929 return fmt.Errorf("failed to create issue comment: %w", err) 930 } 931 932 return tx.Commit() 933 934 case jmodels.CommitOperationDelete: 935 if err := db.DeleteIssueComments( 936 ddb, 937 orm.FilterEq("did", did), 938 orm.FilterEq("rkey", rkey), 939 ); err != nil { 940 return fmt.Errorf("failed to delete issue comment record: %w", err) 941 } 942 943 return nil 944 } 945 946 return nil 947} 948 949func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 950 did := e.Did 951 rkey := e.Commit.RKey 952 953 var err error 954 955 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 956 l.Info("ingesting record") 957 958 ddb, ok := i.Db.Execer.(*db.DB) 959 if !ok { 960 return fmt.Errorf("failed to index label definition, invalid db cast") 961 } 962 963 switch e.Commit.Operation { 964 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 965 raw := json.RawMessage(e.Commit.Record) 966 record := tangled.LabelDefinition{} 967 err = json.Unmarshal(raw, &record) 968 if err != nil { 969 return fmt.Errorf("invalid record: %w", err) 970 } 971 972 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 973 if err != nil { 974 return fmt.Errorf("failed to parse labeldef from record: %w", err) 975 } 976 977 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 978 return fmt.Errorf("failed to validate labeldef: %w", err) 979 } 980 981 _, err = db.AddLabelDefinition(ddb, def) 982 if err != nil { 983 return fmt.Errorf("failed to create labeldef: %w", err) 984 } 985 986 return nil 987 988 case jmodels.CommitOperationDelete: 989 if err := db.DeleteLabelDefinition( 990 ddb, 991 orm.FilterEq("did", did), 992 orm.FilterEq("rkey", rkey), 993 ); err != nil { 994 return fmt.Errorf("failed to delete labeldef record: %w", err) 995 } 996 997 return nil 998 } 999 1000 return nil 1001} 1002 1003func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1004 did := e.Did 1005 rkey := e.Commit.RKey 1006 1007 var err error 1008 1009 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1010 l.Info("ingesting record") 1011 1012 ddb, ok := i.Db.Execer.(*db.DB) 1013 if !ok { 1014 return fmt.Errorf("failed to index label op, invalid db cast") 1015 } 1016 1017 switch e.Commit.Operation { 1018 case jmodels.CommitOperationCreate: 1019 raw := json.RawMessage(e.Commit.Record) 1020 record := tangled.LabelOp{} 1021 err = json.Unmarshal(raw, &record) 1022 if err != nil { 1023 return fmt.Errorf("invalid record: %w", err) 1024 } 1025 1026 subject := syntax.ATURI(record.Subject) 1027 collection := subject.Collection() 1028 1029 var repo *models.Repo 1030 switch collection { 1031 case tangled.RepoIssueNSID: 1032 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1033 if err != nil || len(i) != 1 { 1034 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1035 } 1036 repo = i[0].Repo 1037 default: 1038 return fmt.Errorf("unsupport label subject: %s", collection) 1039 } 1040 1041 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1042 if err != nil { 1043 return fmt.Errorf("failed to build label application ctx: %w", err) 1044 } 1045 1046 ops := models.LabelOpsFromRecord(did, rkey, record) 1047 1048 for _, o := range ops { 1049 def, ok := actx.Defs[o.OperandKey] 1050 if !ok { 1051 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1052 } 1053 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1054 return fmt.Errorf("failed to validate labelop: %w", err) 1055 } 1056 } 1057 1058 tx, err := ddb.Begin() 1059 if err != nil { 1060 return err 1061 } 1062 defer tx.Rollback() 1063 1064 for _, o := range ops { 1065 _, err = db.AddLabelOp(tx, &o) 1066 if err != nil { 1067 return fmt.Errorf("failed to add labelop: %w", err) 1068 } 1069 } 1070 1071 if err = tx.Commit(); err != nil { 1072 return err 1073 } 1074 } 1075 1076 return nil 1077}