Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/appview/validator"
23 "tangled.org/core/idresolver"
24 "tangled.org/core/orm"
25 "tangled.org/core/rbac"
26)
27
28type Ingester struct {
29 Db db.DbWrapper
30 Enforcer *rbac.Enforcer
31 IdResolver *idresolver.Resolver
32 Config *config.Config
33 Logger *slog.Logger
34 Validator *validator.Validator
35}
36
37type processFunc func(ctx context.Context, e *jmodels.Event) error
38
39func (i *Ingester) Ingest() processFunc {
40 return func(ctx context.Context, e *jmodels.Event) error {
41 var err error
42 defer func() {
43 eventTime := e.TimeUS
44 lastTimeUs := eventTime + 1
45 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
46 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
47 }
48 }()
49
50 l := i.Logger.With("kind", e.Kind)
51 switch e.Kind {
52 case jmodels.EventKindAccount:
53 if !e.Account.Active && *e.Account.Status == "deactivated" {
54 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
55 }
56 case jmodels.EventKindIdentity:
57 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
58 case jmodels.EventKindCommit:
59 switch e.Commit.Collection {
60 case tangled.GraphFollowNSID:
61 err = i.ingestFollow(e)
62 case tangled.FeedStarNSID:
63 err = i.ingestStar(e)
64 case tangled.PublicKeyNSID:
65 err = i.ingestPublicKey(e)
66 case tangled.RepoArtifactNSID:
67 err = i.ingestArtifact(e)
68 case tangled.ActorProfileNSID:
69 err = i.ingestProfile(e)
70 case tangled.SpindleMemberNSID:
71 err = i.ingestSpindleMember(ctx, e)
72 case tangled.SpindleNSID:
73 err = i.ingestSpindle(ctx, e)
74 case tangled.KnotMemberNSID:
75 err = i.ingestKnotMember(e)
76 case tangled.KnotNSID:
77 err = i.ingestKnot(e)
78 case tangled.StringNSID:
79 err = i.ingestString(e)
80 case tangled.RepoIssueNSID:
81 err = i.ingestIssue(ctx, e)
82 case tangled.RepoIssueCommentNSID:
83 err = i.ingestIssueComment(e)
84 case tangled.LabelDefinitionNSID:
85 err = i.ingestLabelDefinition(e)
86 case tangled.LabelOpNSID:
87 err = i.ingestLabelOp(e)
88 }
89 l = i.Logger.With("nsid", e.Commit.Collection)
90 }
91
92 if err != nil {
93 l.Warn("refused to ingest record", "err", err)
94 }
95
96 return nil
97 }
98}
99
100func (i *Ingester) ingestStar(e *jmodels.Event) error {
101 var err error
102 did := e.Did
103
104 l := i.Logger.With("handler", "ingestStar")
105 l = l.With("nsid", e.Commit.Collection)
106
107 switch e.Commit.Operation {
108 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
109 var subjectUri syntax.ATURI
110
111 raw := json.RawMessage(e.Commit.Record)
112 record := tangled.FeedStar{}
113 err := json.Unmarshal(raw, &record)
114 if err != nil {
115 l.Error("invalid record", "err", err)
116 return err
117 }
118
119 subjectUri, err = syntax.ParseATURI(record.Subject)
120 if err != nil {
121 l.Error("invalid record", "err", err)
122 return err
123 }
124 err = db.AddStar(i.Db, &models.Star{
125 Did: did,
126 RepoAt: subjectUri,
127 Rkey: e.Commit.RKey,
128 })
129 case jmodels.CommitOperationDelete:
130 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
131 }
132
133 if err != nil {
134 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
135 }
136
137 return nil
138}
139
140func (i *Ingester) ingestFollow(e *jmodels.Event) error {
141 var err error
142 did := e.Did
143
144 l := i.Logger.With("handler", "ingestFollow")
145 l = l.With("nsid", e.Commit.Collection)
146
147 switch e.Commit.Operation {
148 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
149 raw := json.RawMessage(e.Commit.Record)
150 record := tangled.GraphFollow{}
151 err = json.Unmarshal(raw, &record)
152 if err != nil {
153 l.Error("invalid record", "err", err)
154 return err
155 }
156
157 err = db.AddFollow(i.Db, &models.Follow{
158 UserDid: did,
159 SubjectDid: record.Subject,
160 Rkey: e.Commit.RKey,
161 })
162 case jmodels.CommitOperationDelete:
163 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
164 }
165
166 if err != nil {
167 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
168 }
169
170 return nil
171}
172
173func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
174 did := e.Did
175 var err error
176
177 l := i.Logger.With("handler", "ingestPublicKey")
178 l = l.With("nsid", e.Commit.Collection)
179
180 switch e.Commit.Operation {
181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
182 l.Debug("processing add of pubkey")
183 raw := json.RawMessage(e.Commit.Record)
184 record := tangled.PublicKey{}
185 err = json.Unmarshal(raw, &record)
186 if err != nil {
187 l.Error("invalid record", "err", err)
188 return err
189 }
190
191 name := record.Name
192 key := record.Key
193 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
194 case jmodels.CommitOperationDelete:
195 l.Debug("processing delete of pubkey")
196 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
197 }
198
199 if err != nil {
200 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
201 }
202
203 return nil
204}
205
206func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
207 did := e.Did
208 var err error
209
210 l := i.Logger.With("handler", "ingestArtifact")
211 l = l.With("nsid", e.Commit.Collection)
212
213 switch e.Commit.Operation {
214 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
215 raw := json.RawMessage(e.Commit.Record)
216 record := tangled.RepoArtifact{}
217 err = json.Unmarshal(raw, &record)
218 if err != nil {
219 l.Error("invalid record", "err", err)
220 return err
221 }
222
223 repoAt, err := syntax.ParseATURI(record.Repo)
224 if err != nil {
225 return err
226 }
227
228 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
229 if err != nil {
230 return err
231 }
232
233 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
234 if err != nil || !ok {
235 return err
236 }
237
238 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
239 if err != nil {
240 createdAt = time.Now()
241 }
242
243 artifact := models.Artifact{
244 Did: did,
245 Rkey: e.Commit.RKey,
246 RepoAt: repoAt,
247 Tag: plumbing.Hash(record.Tag),
248 CreatedAt: createdAt,
249 BlobCid: cid.Cid(record.Artifact.Ref),
250 Name: record.Name,
251 Size: uint64(record.Artifact.Size),
252 MimeType: record.Artifact.MimeType,
253 }
254
255 err = db.AddArtifact(i.Db, artifact)
256 case jmodels.CommitOperationDelete:
257 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
258 }
259
260 if err != nil {
261 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
262 }
263
264 return nil
265}
266
267func (i *Ingester) ingestProfile(e *jmodels.Event) error {
268 did := e.Did
269 var err error
270
271 l := i.Logger.With("handler", "ingestProfile")
272 l = l.With("nsid", e.Commit.Collection)
273
274 if e.Commit.RKey != "self" {
275 return fmt.Errorf("ingestProfile only ingests `self` record")
276 }
277
278 switch e.Commit.Operation {
279 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
280 raw := json.RawMessage(e.Commit.Record)
281 record := tangled.ActorProfile{}
282 err = json.Unmarshal(raw, &record)
283 if err != nil {
284 l.Error("invalid record", "err", err)
285 return err
286 }
287
288 avatar := ""
289 if record.Avatar != nil {
290 avatar = record.Avatar.Ref.String()
291 }
292
293 description := ""
294 if record.Description != nil {
295 description = *record.Description
296 }
297
298 includeBluesky := record.Bluesky
299
300 pronouns := ""
301 if record.Pronouns != nil {
302 pronouns = *record.Pronouns
303 }
304
305 location := ""
306 if record.Location != nil {
307 location = *record.Location
308 }
309
310 var links [5]string
311 for i, l := range record.Links {
312 if i < 5 {
313 links[i] = l
314 }
315 }
316
317 var stats [2]models.VanityStat
318 for i, s := range record.Stats {
319 if i < 2 {
320 stats[i].Kind = models.VanityStatKind(s)
321 }
322 }
323
324 var pinned [6]syntax.ATURI
325 for i, r := range record.PinnedRepositories {
326 if i < 6 {
327 pinned[i] = syntax.ATURI(r)
328 }
329 }
330
331 profile := models.Profile{
332 Did: did,
333 Avatar: avatar,
334 Description: description,
335 IncludeBluesky: includeBluesky,
336 Location: location,
337 Links: links,
338 Stats: stats,
339 PinnedRepos: pinned,
340 Pronouns: pronouns,
341 }
342
343 ddb, ok := i.Db.Execer.(*db.DB)
344 if !ok {
345 return fmt.Errorf("failed to index profile record, invalid db cast")
346 }
347
348 tx, err := ddb.Begin()
349 if err != nil {
350 return fmt.Errorf("failed to start transaction")
351 }
352
353 err = db.ValidateProfile(tx, &profile)
354 if err != nil {
355 return fmt.Errorf("invalid profile record")
356 }
357
358 err = db.UpsertProfile(tx, &profile)
359 case jmodels.CommitOperationDelete:
360 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
361 }
362
363 if err != nil {
364 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
365 }
366
367 return nil
368}
369
370func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
371 did := e.Did
372 var err error
373
374 l := i.Logger.With("handler", "ingestSpindleMember")
375 l = l.With("nsid", e.Commit.Collection)
376
377 switch e.Commit.Operation {
378 case jmodels.CommitOperationCreate:
379 raw := json.RawMessage(e.Commit.Record)
380 record := tangled.SpindleMember{}
381 err = json.Unmarshal(raw, &record)
382 if err != nil {
383 l.Error("invalid record", "err", err)
384 return err
385 }
386
387 // only spindle owner can invite to spindles
388 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
389 if err != nil || !ok {
390 return fmt.Errorf("failed to enforce permissions: %w", err)
391 }
392
393 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
394 if err != nil {
395 return err
396 }
397
398 if memberId.Handle.IsInvalidHandle() {
399 return err
400 }
401
402 ddb, ok := i.Db.Execer.(*db.DB)
403 if !ok {
404 return fmt.Errorf("failed to index profile record, invalid db cast")
405 }
406
407 err = db.AddSpindleMember(ddb, models.SpindleMember{
408 Did: syntax.DID(did),
409 Rkey: e.Commit.RKey,
410 Instance: record.Instance,
411 Subject: memberId.DID,
412 })
413 if !ok {
414 return fmt.Errorf("failed to add to db: %w", err)
415 }
416
417 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
418 if err != nil {
419 return fmt.Errorf("failed to update ACLs: %w", err)
420 }
421
422 l.Info("added spindle member")
423 case jmodels.CommitOperationDelete:
424 rkey := e.Commit.RKey
425
426 ddb, ok := i.Db.Execer.(*db.DB)
427 if !ok {
428 return fmt.Errorf("failed to index profile record, invalid db cast")
429 }
430
431 // get record from db first
432 members, err := db.GetSpindleMembers(
433 ddb,
434 orm.FilterEq("did", did),
435 orm.FilterEq("rkey", rkey),
436 )
437 if err != nil || len(members) != 1 {
438 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
439 }
440 member := members[0]
441
442 tx, err := ddb.Begin()
443 if err != nil {
444 return fmt.Errorf("failed to start txn: %w", err)
445 }
446
447 // remove record by rkey && update enforcer
448 if err = db.RemoveSpindleMember(
449 tx,
450 orm.FilterEq("did", did),
451 orm.FilterEq("rkey", rkey),
452 ); err != nil {
453 return fmt.Errorf("failed to remove from db: %w", err)
454 }
455
456 // update enforcer
457 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
458 if err != nil {
459 return fmt.Errorf("failed to update ACLs: %w", err)
460 }
461
462 if err = tx.Commit(); err != nil {
463 return fmt.Errorf("failed to commit txn: %w", err)
464 }
465
466 if err = i.Enforcer.E.SavePolicy(); err != nil {
467 return fmt.Errorf("failed to save ACLs: %w", err)
468 }
469
470 l.Info("removed spindle member")
471 }
472
473 return nil
474}
475
476func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
477 did := e.Did
478 var err error
479
480 l := i.Logger.With("handler", "ingestSpindle")
481 l = l.With("nsid", e.Commit.Collection)
482
483 switch e.Commit.Operation {
484 case jmodels.CommitOperationCreate:
485 raw := json.RawMessage(e.Commit.Record)
486 record := tangled.Spindle{}
487 err = json.Unmarshal(raw, &record)
488 if err != nil {
489 l.Error("invalid record", "err", err)
490 return err
491 }
492
493 instance := e.Commit.RKey
494
495 ddb, ok := i.Db.Execer.(*db.DB)
496 if !ok {
497 return fmt.Errorf("failed to index profile record, invalid db cast")
498 }
499
500 err := db.AddSpindle(ddb, models.Spindle{
501 Owner: syntax.DID(did),
502 Instance: instance,
503 })
504 if err != nil {
505 l.Error("failed to add spindle to db", "err", err, "instance", instance)
506 return err
507 }
508
509 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
510 if err != nil {
511 l.Error("failed to add spindle to db", "err", err, "instance", instance)
512 return err
513 }
514
515 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
516 if err != nil {
517 return fmt.Errorf("failed to mark verified: %w", err)
518 }
519
520 return nil
521
522 case jmodels.CommitOperationDelete:
523 instance := e.Commit.RKey
524
525 ddb, ok := i.Db.Execer.(*db.DB)
526 if !ok {
527 return fmt.Errorf("failed to index profile record, invalid db cast")
528 }
529
530 // get record from db first
531 spindles, err := db.GetSpindles(
532 ddb,
533 orm.FilterEq("owner", did),
534 orm.FilterEq("instance", instance),
535 )
536 if err != nil || len(spindles) != 1 {
537 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
538 }
539 spindle := spindles[0]
540
541 tx, err := ddb.Begin()
542 if err != nil {
543 return err
544 }
545 defer func() {
546 tx.Rollback()
547 i.Enforcer.E.LoadPolicy()
548 }()
549
550 // remove spindle members first
551 err = db.RemoveSpindleMember(
552 tx,
553 orm.FilterEq("owner", did),
554 orm.FilterEq("instance", instance),
555 )
556 if err != nil {
557 return err
558 }
559
560 err = db.DeleteSpindle(
561 tx,
562 orm.FilterEq("owner", did),
563 orm.FilterEq("instance", instance),
564 )
565 if err != nil {
566 return err
567 }
568
569 if spindle.Verified != nil {
570 err = i.Enforcer.RemoveSpindle(instance)
571 if err != nil {
572 return err
573 }
574 }
575
576 err = tx.Commit()
577 if err != nil {
578 return err
579 }
580
581 err = i.Enforcer.E.SavePolicy()
582 if err != nil {
583 return err
584 }
585 }
586
587 return nil
588}
589
590func (i *Ingester) ingestString(e *jmodels.Event) error {
591 did := e.Did
592 rkey := e.Commit.RKey
593
594 var err error
595
596 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
597 l.Info("ingesting record")
598
599 ddb, ok := i.Db.Execer.(*db.DB)
600 if !ok {
601 return fmt.Errorf("failed to index string record, invalid db cast")
602 }
603
604 switch e.Commit.Operation {
605 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
606 raw := json.RawMessage(e.Commit.Record)
607 record := tangled.String{}
608 err = json.Unmarshal(raw, &record)
609 if err != nil {
610 l.Error("invalid record", "err", err)
611 return err
612 }
613
614 string := models.StringFromRecord(did, rkey, record)
615
616 if err = i.Validator.ValidateString(&string); err != nil {
617 l.Error("invalid record", "err", err)
618 return err
619 }
620
621 if err = db.AddString(ddb, string); err != nil {
622 l.Error("failed to add string", "err", err)
623 return err
624 }
625
626 return nil
627
628 case jmodels.CommitOperationDelete:
629 if err := db.DeleteString(
630 ddb,
631 orm.FilterEq("did", did),
632 orm.FilterEq("rkey", rkey),
633 ); err != nil {
634 l.Error("failed to delete", "err", err)
635 return fmt.Errorf("failed to delete string record: %w", err)
636 }
637
638 return nil
639 }
640
641 return nil
642}
643
644func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
645 did := e.Did
646 var err error
647
648 l := i.Logger.With("handler", "ingestKnotMember")
649 l = l.With("nsid", e.Commit.Collection)
650
651 switch e.Commit.Operation {
652 case jmodels.CommitOperationCreate:
653 raw := json.RawMessage(e.Commit.Record)
654 record := tangled.KnotMember{}
655 err = json.Unmarshal(raw, &record)
656 if err != nil {
657 l.Error("invalid record", "err", err)
658 return err
659 }
660
661 // only knot owner can invite to knots
662 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
663 if err != nil || !ok {
664 return fmt.Errorf("failed to enforce permissions: %w", err)
665 }
666
667 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
668 if err != nil {
669 return err
670 }
671
672 if memberId.Handle.IsInvalidHandle() {
673 return err
674 }
675
676 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
677 if err != nil {
678 return fmt.Errorf("failed to update ACLs: %w", err)
679 }
680
681 l.Info("added knot member")
682 case jmodels.CommitOperationDelete:
683 // we don't store knot members in a table (like we do for spindle)
684 // and we can't remove this just yet. possibly fixed if we switch
685 // to either:
686 // 1. a knot_members table like with spindle and store the rkey
687 // 2. use the knot host as the rkey
688 //
689 // TODO: implement member deletion
690 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
691 }
692
693 return nil
694}
695
696func (i *Ingester) ingestKnot(e *jmodels.Event) error {
697 did := e.Did
698 var err error
699
700 l := i.Logger.With("handler", "ingestKnot")
701 l = l.With("nsid", e.Commit.Collection)
702
703 switch e.Commit.Operation {
704 case jmodels.CommitOperationCreate:
705 raw := json.RawMessage(e.Commit.Record)
706 record := tangled.Knot{}
707 err = json.Unmarshal(raw, &record)
708 if err != nil {
709 l.Error("invalid record", "err", err)
710 return err
711 }
712
713 domain := e.Commit.RKey
714
715 ddb, ok := i.Db.Execer.(*db.DB)
716 if !ok {
717 return fmt.Errorf("failed to index profile record, invalid db cast")
718 }
719
720 err := db.AddKnot(ddb, domain, did)
721 if err != nil {
722 l.Error("failed to add knot to db", "err", err, "domain", domain)
723 return err
724 }
725
726 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
727 if err != nil {
728 l.Error("failed to verify knot", "err", err, "domain", domain)
729 return err
730 }
731
732 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
733 if err != nil {
734 return fmt.Errorf("failed to mark verified: %w", err)
735 }
736
737 return nil
738
739 case jmodels.CommitOperationDelete:
740 domain := e.Commit.RKey
741
742 ddb, ok := i.Db.Execer.(*db.DB)
743 if !ok {
744 return fmt.Errorf("failed to index knot record, invalid db cast")
745 }
746
747 // get record from db first
748 registrations, err := db.GetRegistrations(
749 ddb,
750 orm.FilterEq("domain", domain),
751 orm.FilterEq("did", did),
752 )
753 if err != nil {
754 return fmt.Errorf("failed to get registration: %w", err)
755 }
756 if len(registrations) != 1 {
757 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
758 }
759 registration := registrations[0]
760
761 tx, err := ddb.Begin()
762 if err != nil {
763 return err
764 }
765 defer func() {
766 tx.Rollback()
767 i.Enforcer.E.LoadPolicy()
768 }()
769
770 err = db.DeleteKnot(
771 tx,
772 orm.FilterEq("did", did),
773 orm.FilterEq("domain", domain),
774 )
775 if err != nil {
776 return err
777 }
778
779 if registration.Registered != nil {
780 err = i.Enforcer.RemoveKnot(domain)
781 if err != nil {
782 return err
783 }
784 }
785
786 err = tx.Commit()
787 if err != nil {
788 return err
789 }
790
791 err = i.Enforcer.E.SavePolicy()
792 if err != nil {
793 return err
794 }
795 }
796
797 return nil
798}
799func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
800 did := e.Did
801 rkey := e.Commit.RKey
802
803 var err error
804
805 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
806 l.Info("ingesting record")
807
808 ddb, ok := i.Db.Execer.(*db.DB)
809 if !ok {
810 return fmt.Errorf("failed to index issue record, invalid db cast")
811 }
812
813 switch e.Commit.Operation {
814 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
815 raw := json.RawMessage(e.Commit.Record)
816 record := tangled.RepoIssue{}
817 err = json.Unmarshal(raw, &record)
818 if err != nil {
819 l.Error("invalid record", "err", err)
820 return err
821 }
822
823 issue := models.IssueFromRecord(did, rkey, record)
824
825 if err := i.Validator.ValidateIssue(&issue); err != nil {
826 return fmt.Errorf("failed to validate issue: %w", err)
827 }
828
829 tx, err := ddb.BeginTx(ctx, nil)
830 if err != nil {
831 l.Error("failed to begin transaction", "err", err)
832 return err
833 }
834 defer tx.Rollback()
835
836 err = db.PutIssue(tx, &issue)
837 if err != nil {
838 l.Error("failed to create issue", "err", err)
839 return err
840 }
841
842 err = tx.Commit()
843 if err != nil {
844 l.Error("failed to commit txn", "err", err)
845 return err
846 }
847
848 return nil
849
850 case jmodels.CommitOperationDelete:
851 tx, err := ddb.BeginTx(ctx, nil)
852 if err != nil {
853 l.Error("failed to begin transaction", "err", err)
854 return err
855 }
856 defer tx.Rollback()
857
858 if err := db.DeleteIssues(
859 tx,
860 did,
861 rkey,
862 ); err != nil {
863 l.Error("failed to delete", "err", err)
864 return fmt.Errorf("failed to delete issue record: %w", err)
865 }
866 if err := tx.Commit(); err != nil {
867 l.Error("failed to commit txn", "err", err)
868 return err
869 }
870
871 return nil
872 }
873
874 return nil
875}
876
877func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
878 did := e.Did
879 rkey := e.Commit.RKey
880
881 var err error
882
883 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
884 l.Info("ingesting record")
885
886 ddb, ok := i.Db.Execer.(*db.DB)
887 if !ok {
888 return fmt.Errorf("failed to index issue comment record, invalid db cast")
889 }
890
891 switch e.Commit.Operation {
892 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
893 raw := json.RawMessage(e.Commit.Record)
894 record := tangled.RepoIssueComment{}
895 err = json.Unmarshal(raw, &record)
896 if err != nil {
897 return fmt.Errorf("invalid record: %w", err)
898 }
899
900 comment, err := models.IssueCommentFromRecord(did, rkey, record)
901 if err != nil {
902 return fmt.Errorf("failed to parse comment from record: %w", err)
903 }
904
905 if err := i.Validator.ValidateIssueComment(comment); err != nil {
906 return fmt.Errorf("failed to validate comment: %w", err)
907 }
908
909 tx, err := ddb.Begin()
910 if err != nil {
911 return fmt.Errorf("failed to start transaction: %w", err)
912 }
913 defer tx.Rollback()
914
915 _, err = db.AddIssueComment(tx, *comment)
916 if err != nil {
917 return fmt.Errorf("failed to create issue comment: %w", err)
918 }
919
920 return tx.Commit()
921
922 case jmodels.CommitOperationDelete:
923 if err := db.DeleteIssueComments(
924 ddb,
925 orm.FilterEq("did", did),
926 orm.FilterEq("rkey", rkey),
927 ); err != nil {
928 return fmt.Errorf("failed to delete issue comment record: %w", err)
929 }
930
931 return nil
932 }
933
934 return nil
935}
936
937func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
938 did := e.Did
939 rkey := e.Commit.RKey
940
941 var err error
942
943 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
944 l.Info("ingesting record")
945
946 ddb, ok := i.Db.Execer.(*db.DB)
947 if !ok {
948 return fmt.Errorf("failed to index label definition, invalid db cast")
949 }
950
951 switch e.Commit.Operation {
952 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
953 raw := json.RawMessage(e.Commit.Record)
954 record := tangled.LabelDefinition{}
955 err = json.Unmarshal(raw, &record)
956 if err != nil {
957 return fmt.Errorf("invalid record: %w", err)
958 }
959
960 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
961 if err != nil {
962 return fmt.Errorf("failed to parse labeldef from record: %w", err)
963 }
964
965 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
966 return fmt.Errorf("failed to validate labeldef: %w", err)
967 }
968
969 _, err = db.AddLabelDefinition(ddb, def)
970 if err != nil {
971 return fmt.Errorf("failed to create labeldef: %w", err)
972 }
973
974 return nil
975
976 case jmodels.CommitOperationDelete:
977 if err := db.DeleteLabelDefinition(
978 ddb,
979 orm.FilterEq("did", did),
980 orm.FilterEq("rkey", rkey),
981 ); err != nil {
982 return fmt.Errorf("failed to delete labeldef record: %w", err)
983 }
984
985 return nil
986 }
987
988 return nil
989}
990
991func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
992 did := e.Did
993 rkey := e.Commit.RKey
994
995 var err error
996
997 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
998 l.Info("ingesting record")
999
1000 ddb, ok := i.Db.Execer.(*db.DB)
1001 if !ok {
1002 return fmt.Errorf("failed to index label op, invalid db cast")
1003 }
1004
1005 switch e.Commit.Operation {
1006 case jmodels.CommitOperationCreate:
1007 raw := json.RawMessage(e.Commit.Record)
1008 record := tangled.LabelOp{}
1009 err = json.Unmarshal(raw, &record)
1010 if err != nil {
1011 return fmt.Errorf("invalid record: %w", err)
1012 }
1013
1014 subject := syntax.ATURI(record.Subject)
1015 collection := subject.Collection()
1016
1017 var repo *models.Repo
1018 switch collection {
1019 case tangled.RepoIssueNSID:
1020 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1021 if err != nil || len(i) != 1 {
1022 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1023 }
1024 repo = i[0].Repo
1025 default:
1026 return fmt.Errorf("unsupport label subject: %s", collection)
1027 }
1028
1029 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1030 if err != nil {
1031 return fmt.Errorf("failed to build label application ctx: %w", err)
1032 }
1033
1034 ops := models.LabelOpsFromRecord(did, rkey, record)
1035
1036 for _, o := range ops {
1037 def, ok := actx.Defs[o.OperandKey]
1038 if !ok {
1039 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1040 }
1041 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1042 return fmt.Errorf("failed to validate labelop: %w", err)
1043 }
1044 }
1045
1046 tx, err := ddb.Begin()
1047 if err != nil {
1048 return err
1049 }
1050 defer tx.Rollback()
1051
1052 for _, o := range ops {
1053 _, err = db.AddLabelOp(tx, &o)
1054 if err != nil {
1055 return fmt.Errorf("failed to add labelop: %w", err)
1056 }
1057 }
1058
1059 if err = tx.Commit(); err != nil {
1060 return err
1061 }
1062 }
1063
1064 return nil
1065}