forked from
tangled.org/core
Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/appview/validator"
23 "tangled.org/core/idresolver"
24 "tangled.org/core/orm"
25 "tangled.org/core/rbac"
26)
27
28type Ingester struct {
29 Db db.DbWrapper
30 Enforcer *rbac.Enforcer
31 IdResolver *idresolver.Resolver
32 Config *config.Config
33 Logger *slog.Logger
34 Validator *validator.Validator
35}
36
37type processFunc func(ctx context.Context, e *jmodels.Event) error
38
39func (i *Ingester) Ingest() processFunc {
40 return func(ctx context.Context, e *jmodels.Event) error {
41 var err error
42 defer func() {
43 eventTime := e.TimeUS
44 lastTimeUs := eventTime + 1
45 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
46 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
47 }
48 }()
49
50 l := i.Logger.With("kind", e.Kind)
51 switch e.Kind {
52 case jmodels.EventKindAccount:
53 if !e.Account.Active && *e.Account.Status == "deactivated" {
54 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
55 }
56 case jmodels.EventKindIdentity:
57 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
58 case jmodels.EventKindCommit:
59 switch e.Commit.Collection {
60 case tangled.GraphFollowNSID:
61 err = i.ingestFollow(e)
62 case tangled.FeedStarNSID:
63 err = i.ingestStar(e)
64 case tangled.PublicKeyNSID:
65 err = i.ingestPublicKey(e)
66 case tangled.RepoArtifactNSID:
67 err = i.ingestArtifact(e)
68 case tangled.ActorProfileNSID:
69 err = i.ingestProfile(ctx, e)
70 case tangled.SpindleMemberNSID:
71 err = i.ingestSpindleMember(ctx, e)
72 case tangled.SpindleNSID:
73 err = i.ingestSpindle(ctx, e)
74 case tangled.KnotMemberNSID:
75 err = i.ingestKnotMember(e)
76 case tangled.KnotNSID:
77 err = i.ingestKnot(e)
78 case tangled.StringNSID:
79 err = i.ingestString(e)
80 case tangled.RepoIssueNSID:
81 err = i.ingestIssue(ctx, e)
82 case tangled.RepoIssueCommentNSID:
83 err = i.ingestIssueComment(e)
84 case tangled.LabelDefinitionNSID:
85 err = i.ingestLabelDefinition(e)
86 case tangled.LabelOpNSID:
87 err = i.ingestLabelOp(e)
88 }
89 l = i.Logger.With("nsid", e.Commit.Collection)
90 }
91
92 if err != nil {
93 l.Warn("refused to ingest record", "err", err)
94 }
95
96 return nil
97 }
98}
99
100func (i *Ingester) ingestStar(e *jmodels.Event) error {
101 var err error
102 did := e.Did
103
104 l := i.Logger.With("handler", "ingestStar")
105 l = l.With("nsid", e.Commit.Collection)
106
107 switch e.Commit.Operation {
108 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
109 var subjectUri syntax.ATURI
110
111 raw := json.RawMessage(e.Commit.Record)
112 record := tangled.FeedStar{}
113 err := json.Unmarshal(raw, &record)
114 if err != nil {
115 l.Error("invalid record", "err", err)
116 return err
117 }
118
119 subjectUri, err = syntax.ParseATURI(record.Subject)
120 if err != nil {
121 l.Error("invalid record", "err", err)
122 return err
123 }
124 err = db.AddStar(i.Db, &models.Star{
125 Did: did,
126 RepoAt: subjectUri,
127 Rkey: e.Commit.RKey,
128 })
129 case jmodels.CommitOperationDelete:
130 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
131 }
132
133 if err != nil {
134 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
135 }
136
137 return nil
138}
139
140func (i *Ingester) ingestFollow(e *jmodels.Event) error {
141 var err error
142 did := e.Did
143
144 l := i.Logger.With("handler", "ingestFollow")
145 l = l.With("nsid", e.Commit.Collection)
146
147 switch e.Commit.Operation {
148 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
149 raw := json.RawMessage(e.Commit.Record)
150 record := tangled.GraphFollow{}
151 err = json.Unmarshal(raw, &record)
152 if err != nil {
153 l.Error("invalid record", "err", err)
154 return err
155 }
156
157 err = db.AddFollow(i.Db, &models.Follow{
158 UserDid: did,
159 SubjectDid: record.Subject,
160 Rkey: e.Commit.RKey,
161 })
162 case jmodels.CommitOperationDelete:
163 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
164 }
165
166 if err != nil {
167 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
168 }
169
170 return nil
171}
172
173func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
174 did := e.Did
175 var err error
176
177 l := i.Logger.With("handler", "ingestPublicKey")
178 l = l.With("nsid", e.Commit.Collection)
179
180 switch e.Commit.Operation {
181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
182 l.Debug("processing add of pubkey")
183 raw := json.RawMessage(e.Commit.Record)
184 record := tangled.PublicKey{}
185 err = json.Unmarshal(raw, &record)
186 if err != nil {
187 l.Error("invalid record", "err", err)
188 return err
189 }
190
191 name := record.Name
192 key := record.Key
193 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
194 case jmodels.CommitOperationDelete:
195 l.Debug("processing delete of pubkey")
196 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
197 }
198
199 if err != nil {
200 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
201 }
202
203 return nil
204}
205
206func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
207 did := e.Did
208 var err error
209
210 l := i.Logger.With("handler", "ingestArtifact")
211 l = l.With("nsid", e.Commit.Collection)
212
213 switch e.Commit.Operation {
214 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
215 raw := json.RawMessage(e.Commit.Record)
216 record := tangled.RepoArtifact{}
217 err = json.Unmarshal(raw, &record)
218 if err != nil {
219 l.Error("invalid record", "err", err)
220 return err
221 }
222
223 repoAt, err := syntax.ParseATURI(record.Repo)
224 if err != nil {
225 return err
226 }
227
228 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
229 if err != nil {
230 return err
231 }
232
233 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
234 if err != nil || !ok {
235 return err
236 }
237
238 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
239 if err != nil {
240 createdAt = time.Now()
241 }
242
243 artifact := models.Artifact{
244 Did: did,
245 Rkey: e.Commit.RKey,
246 RepoAt: repoAt,
247 Tag: plumbing.Hash(record.Tag),
248 CreatedAt: createdAt,
249 BlobCid: cid.Cid(record.Artifact.Ref),
250 Name: record.Name,
251 Size: uint64(record.Artifact.Size),
252 MimeType: record.Artifact.MimeType,
253 }
254
255 err = db.AddArtifact(i.Db, artifact)
256 case jmodels.CommitOperationDelete:
257 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
258 }
259
260 if err != nil {
261 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
262 }
263
264 return nil
265}
266
267func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
268 did := e.Did
269 var err error
270
271 l := i.Logger.With("handler", "ingestProfile")
272 l = l.With("nsid", e.Commit.Collection)
273
274 if e.Commit.RKey != "self" {
275 return fmt.Errorf("ingestProfile only ingests `self` record")
276 }
277
278 switch e.Commit.Operation {
279 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
280 raw := json.RawMessage(e.Commit.Record)
281 record := tangled.ActorProfile{}
282 err = json.Unmarshal(raw, &record)
283 if err != nil {
284 l.Error("invalid record", "err", err)
285 return err
286 }
287
288 avatar := ""
289 if record.Avatar != nil {
290 avatar = record.Avatar.Ref.String()
291 }
292
293 description := ""
294 if record.Description != nil {
295 description = *record.Description
296 }
297
298 includeBluesky := record.Bluesky
299
300 pronouns := ""
301 if record.Pronouns != nil {
302 pronouns = *record.Pronouns
303 }
304
305 location := ""
306 if record.Location != nil {
307 location = *record.Location
308 }
309
310 var links [5]string
311 for i, l := range record.Links {
312 if i < 5 {
313 links[i] = l
314 }
315 }
316
317 var stats [2]models.VanityStat
318 for i, s := range record.Stats {
319 if i < 2 {
320 stats[i].Kind = models.ParseVanityStatKind(s)
321 }
322 }
323
324 var pinned [6]syntax.ATURI
325 for i, r := range record.PinnedRepositories {
326 if i < 6 {
327 pinned[i] = syntax.ATURI(r)
328 }
329 }
330
331 var preferredHandle syntax.Handle
332 if record.PreferredHandle != nil {
333 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
334 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
335 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
336 preferredHandle = h
337 }
338 }
339 }
340
341 profile := models.Profile{
342 Did: did,
343 Avatar: avatar,
344 Description: description,
345 IncludeBluesky: includeBluesky,
346 Location: location,
347 Links: links,
348 Stats: stats,
349 PinnedRepos: pinned,
350 Pronouns: pronouns,
351 PreferredHandle: preferredHandle,
352 }
353
354 ddb, ok := i.Db.Execer.(*db.DB)
355 if !ok {
356 return fmt.Errorf("failed to index profile record, invalid db cast")
357 }
358
359 tx, err := ddb.Begin()
360 if err != nil {
361 return fmt.Errorf("failed to start transaction")
362 }
363
364 err = db.ValidateProfile(tx, &profile)
365 if err != nil {
366 return fmt.Errorf("invalid profile record")
367 }
368
369 err = db.UpsertProfile(tx, &profile)
370 case jmodels.CommitOperationDelete:
371 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
372 }
373
374 if err != nil {
375 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
376 }
377
378 return nil
379}
380
381func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
382 did := e.Did
383 var err error
384
385 l := i.Logger.With("handler", "ingestSpindleMember")
386 l = l.With("nsid", e.Commit.Collection)
387
388 switch e.Commit.Operation {
389 case jmodels.CommitOperationCreate:
390 raw := json.RawMessage(e.Commit.Record)
391 record := tangled.SpindleMember{}
392 err = json.Unmarshal(raw, &record)
393 if err != nil {
394 l.Error("invalid record", "err", err)
395 return err
396 }
397
398 // only spindle owner can invite to spindles
399 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
400 if err != nil || !ok {
401 return fmt.Errorf("failed to enforce permissions: %w", err)
402 }
403
404 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
405 if err != nil {
406 return err
407 }
408
409 if memberId.Handle.IsInvalidHandle() {
410 return err
411 }
412
413 ddb, ok := i.Db.Execer.(*db.DB)
414 if !ok {
415 return fmt.Errorf("invalid db cast")
416 }
417
418 err = db.AddSpindleMember(ddb, models.SpindleMember{
419 Did: syntax.DID(did),
420 Rkey: e.Commit.RKey,
421 Instance: record.Instance,
422 Subject: memberId.DID,
423 })
424 if !ok {
425 return fmt.Errorf("failed to add to db: %w", err)
426 }
427
428 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
429 if err != nil {
430 return fmt.Errorf("failed to update ACLs: %w", err)
431 }
432
433 l.Info("added spindle member")
434 case jmodels.CommitOperationDelete:
435 rkey := e.Commit.RKey
436
437 ddb, ok := i.Db.Execer.(*db.DB)
438 if !ok {
439 return fmt.Errorf("failed to index profile record, invalid db cast")
440 }
441
442 // get record from db first
443 members, err := db.GetSpindleMembers(
444 ddb,
445 orm.FilterEq("did", did),
446 orm.FilterEq("rkey", rkey),
447 )
448 if err != nil || len(members) != 1 {
449 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
450 }
451 member := members[0]
452
453 tx, err := ddb.Begin()
454 if err != nil {
455 return fmt.Errorf("failed to start txn: %w", err)
456 }
457
458 // remove record by rkey && update enforcer
459 if err = db.RemoveSpindleMember(
460 tx,
461 orm.FilterEq("did", did),
462 orm.FilterEq("rkey", rkey),
463 ); err != nil {
464 return fmt.Errorf("failed to remove from db: %w", err)
465 }
466
467 // update enforcer
468 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
469 if err != nil {
470 return fmt.Errorf("failed to update ACLs: %w", err)
471 }
472
473 if err = tx.Commit(); err != nil {
474 return fmt.Errorf("failed to commit txn: %w", err)
475 }
476
477 if err = i.Enforcer.E.SavePolicy(); err != nil {
478 return fmt.Errorf("failed to save ACLs: %w", err)
479 }
480
481 l.Info("removed spindle member")
482 }
483
484 return nil
485}
486
487func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
488 did := e.Did
489 var err error
490
491 l := i.Logger.With("handler", "ingestSpindle")
492 l = l.With("nsid", e.Commit.Collection)
493
494 switch e.Commit.Operation {
495 case jmodels.CommitOperationCreate:
496 raw := json.RawMessage(e.Commit.Record)
497 record := tangled.Spindle{}
498 err = json.Unmarshal(raw, &record)
499 if err != nil {
500 l.Error("invalid record", "err", err)
501 return err
502 }
503
504 instance := e.Commit.RKey
505
506 ddb, ok := i.Db.Execer.(*db.DB)
507 if !ok {
508 return fmt.Errorf("failed to index profile record, invalid db cast")
509 }
510
511 err := db.AddSpindle(ddb, models.Spindle{
512 Owner: syntax.DID(did),
513 Instance: instance,
514 })
515 if err != nil {
516 l.Error("failed to add spindle to db", "err", err, "instance", instance)
517 return err
518 }
519
520 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
521 if err != nil {
522 l.Error("failed to add spindle to db", "err", err, "instance", instance)
523 return err
524 }
525
526 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
527 if err != nil {
528 return fmt.Errorf("failed to mark verified: %w", err)
529 }
530
531 return nil
532
533 case jmodels.CommitOperationDelete:
534 instance := e.Commit.RKey
535
536 ddb, ok := i.Db.Execer.(*db.DB)
537 if !ok {
538 return fmt.Errorf("failed to index profile record, invalid db cast")
539 }
540
541 // get record from db first
542 spindles, err := db.GetSpindles(
543 ctx,
544 ddb,
545 orm.FilterEq("owner", did),
546 orm.FilterEq("instance", instance),
547 )
548 if err != nil || len(spindles) != 1 {
549 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
550 }
551 spindle := spindles[0]
552
553 tx, err := ddb.Begin()
554 if err != nil {
555 return err
556 }
557 defer func() {
558 tx.Rollback()
559 i.Enforcer.E.LoadPolicy()
560 }()
561
562 // remove spindle members first
563 err = db.RemoveSpindleMember(
564 tx,
565 orm.FilterEq("owner", did),
566 orm.FilterEq("instance", instance),
567 )
568 if err != nil {
569 return err
570 }
571
572 err = db.DeleteSpindle(
573 tx,
574 orm.FilterEq("owner", did),
575 orm.FilterEq("instance", instance),
576 )
577 if err != nil {
578 return err
579 }
580
581 if spindle.Verified != nil {
582 err = i.Enforcer.RemoveSpindle(instance)
583 if err != nil {
584 return err
585 }
586 }
587
588 err = tx.Commit()
589 if err != nil {
590 return err
591 }
592
593 err = i.Enforcer.E.SavePolicy()
594 if err != nil {
595 return err
596 }
597 }
598
599 return nil
600}
601
602func (i *Ingester) ingestString(e *jmodels.Event) error {
603 did := e.Did
604 rkey := e.Commit.RKey
605
606 var err error
607
608 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
609 l.Info("ingesting record")
610
611 ddb, ok := i.Db.Execer.(*db.DB)
612 if !ok {
613 return fmt.Errorf("failed to index string record, invalid db cast")
614 }
615
616 switch e.Commit.Operation {
617 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
618 raw := json.RawMessage(e.Commit.Record)
619 record := tangled.String{}
620 err = json.Unmarshal(raw, &record)
621 if err != nil {
622 l.Error("invalid record", "err", err)
623 return err
624 }
625
626 string := models.StringFromRecord(did, rkey, record)
627
628 if err = i.Validator.ValidateString(&string); err != nil {
629 l.Error("invalid record", "err", err)
630 return err
631 }
632
633 if err = db.AddString(ddb, string); err != nil {
634 l.Error("failed to add string", "err", err)
635 return err
636 }
637
638 return nil
639
640 case jmodels.CommitOperationDelete:
641 if err := db.DeleteString(
642 ddb,
643 orm.FilterEq("did", did),
644 orm.FilterEq("rkey", rkey),
645 ); err != nil {
646 l.Error("failed to delete", "err", err)
647 return fmt.Errorf("failed to delete string record: %w", err)
648 }
649
650 return nil
651 }
652
653 return nil
654}
655
656func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
657 did := e.Did
658 var err error
659
660 l := i.Logger.With("handler", "ingestKnotMember")
661 l = l.With("nsid", e.Commit.Collection)
662
663 switch e.Commit.Operation {
664 case jmodels.CommitOperationCreate:
665 raw := json.RawMessage(e.Commit.Record)
666 record := tangled.KnotMember{}
667 err = json.Unmarshal(raw, &record)
668 if err != nil {
669 l.Error("invalid record", "err", err)
670 return err
671 }
672
673 // only knot owner can invite to knots
674 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
675 if err != nil || !ok {
676 return fmt.Errorf("failed to enforce permissions: %w", err)
677 }
678
679 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
680 if err != nil {
681 return err
682 }
683
684 if memberId.Handle.IsInvalidHandle() {
685 return err
686 }
687
688 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
689 if err != nil {
690 return fmt.Errorf("failed to update ACLs: %w", err)
691 }
692
693 l.Info("added knot member")
694 case jmodels.CommitOperationDelete:
695 // we don't store knot members in a table (like we do for spindle)
696 // and we can't remove this just yet. possibly fixed if we switch
697 // to either:
698 // 1. a knot_members table like with spindle and store the rkey
699 // 2. use the knot host as the rkey
700 //
701 // TODO: implement member deletion
702 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
703 }
704
705 return nil
706}
707
708func (i *Ingester) ingestKnot(e *jmodels.Event) error {
709 did := e.Did
710 var err error
711
712 l := i.Logger.With("handler", "ingestKnot")
713 l = l.With("nsid", e.Commit.Collection)
714
715 switch e.Commit.Operation {
716 case jmodels.CommitOperationCreate:
717 raw := json.RawMessage(e.Commit.Record)
718 record := tangled.Knot{}
719 err = json.Unmarshal(raw, &record)
720 if err != nil {
721 l.Error("invalid record", "err", err)
722 return err
723 }
724
725 domain := e.Commit.RKey
726
727 ddb, ok := i.Db.Execer.(*db.DB)
728 if !ok {
729 return fmt.Errorf("failed to index profile record, invalid db cast")
730 }
731
732 err := db.AddKnot(ddb, domain, did)
733 if err != nil {
734 l.Error("failed to add knot to db", "err", err, "domain", domain)
735 return err
736 }
737
738 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
739 if err != nil {
740 l.Error("failed to verify knot", "err", err, "domain", domain)
741 return err
742 }
743
744 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
745 if err != nil {
746 return fmt.Errorf("failed to mark verified: %w", err)
747 }
748
749 return nil
750
751 case jmodels.CommitOperationDelete:
752 domain := e.Commit.RKey
753
754 ddb, ok := i.Db.Execer.(*db.DB)
755 if !ok {
756 return fmt.Errorf("failed to index knot record, invalid db cast")
757 }
758
759 // get record from db first
760 registrations, err := db.GetRegistrations(
761 ddb,
762 orm.FilterEq("domain", domain),
763 orm.FilterEq("did", did),
764 )
765 if err != nil {
766 return fmt.Errorf("failed to get registration: %w", err)
767 }
768 if len(registrations) != 1 {
769 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
770 }
771 registration := registrations[0]
772
773 tx, err := ddb.Begin()
774 if err != nil {
775 return err
776 }
777 defer func() {
778 tx.Rollback()
779 i.Enforcer.E.LoadPolicy()
780 }()
781
782 err = db.DeleteKnot(
783 tx,
784 orm.FilterEq("did", did),
785 orm.FilterEq("domain", domain),
786 )
787 if err != nil {
788 return err
789 }
790
791 if registration.Registered != nil {
792 err = i.Enforcer.RemoveKnot(domain)
793 if err != nil {
794 return err
795 }
796 }
797
798 err = tx.Commit()
799 if err != nil {
800 return err
801 }
802
803 err = i.Enforcer.E.SavePolicy()
804 if err != nil {
805 return err
806 }
807 }
808
809 return nil
810}
811func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
812 did := e.Did
813 rkey := e.Commit.RKey
814
815 var err error
816
817 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
818 l.Info("ingesting record")
819
820 ddb, ok := i.Db.Execer.(*db.DB)
821 if !ok {
822 return fmt.Errorf("failed to index issue record, invalid db cast")
823 }
824
825 switch e.Commit.Operation {
826 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
827 raw := json.RawMessage(e.Commit.Record)
828 record := tangled.RepoIssue{}
829 err = json.Unmarshal(raw, &record)
830 if err != nil {
831 l.Error("invalid record", "err", err)
832 return err
833 }
834
835 issue := models.IssueFromRecord(did, rkey, record)
836
837 if err := i.Validator.ValidateIssue(&issue); err != nil {
838 return fmt.Errorf("failed to validate issue: %w", err)
839 }
840
841 tx, err := ddb.BeginTx(ctx, nil)
842 if err != nil {
843 l.Error("failed to begin transaction", "err", err)
844 return err
845 }
846 defer tx.Rollback()
847
848 err = db.PutIssue(tx, &issue)
849 if err != nil {
850 l.Error("failed to create issue", "err", err)
851 return err
852 }
853
854 err = tx.Commit()
855 if err != nil {
856 l.Error("failed to commit txn", "err", err)
857 return err
858 }
859
860 return nil
861
862 case jmodels.CommitOperationDelete:
863 tx, err := ddb.BeginTx(ctx, nil)
864 if err != nil {
865 l.Error("failed to begin transaction", "err", err)
866 return err
867 }
868 defer tx.Rollback()
869
870 if err := db.DeleteIssues(
871 tx,
872 did,
873 rkey,
874 ); err != nil {
875 l.Error("failed to delete", "err", err)
876 return fmt.Errorf("failed to delete issue record: %w", err)
877 }
878 if err := tx.Commit(); err != nil {
879 l.Error("failed to commit txn", "err", err)
880 return err
881 }
882
883 return nil
884 }
885
886 return nil
887}
888
889func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
890 did := e.Did
891 rkey := e.Commit.RKey
892
893 var err error
894
895 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
896 l.Info("ingesting record")
897
898 ddb, ok := i.Db.Execer.(*db.DB)
899 if !ok {
900 return fmt.Errorf("failed to index issue comment record, invalid db cast")
901 }
902
903 switch e.Commit.Operation {
904 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
905 raw := json.RawMessage(e.Commit.Record)
906 record := tangled.RepoIssueComment{}
907 err = json.Unmarshal(raw, &record)
908 if err != nil {
909 return fmt.Errorf("invalid record: %w", err)
910 }
911
912 comment, err := models.IssueCommentFromRecord(did, rkey, record)
913 if err != nil {
914 return fmt.Errorf("failed to parse comment from record: %w", err)
915 }
916
917 if err := i.Validator.ValidateIssueComment(comment); err != nil {
918 return fmt.Errorf("failed to validate comment: %w", err)
919 }
920
921 tx, err := ddb.Begin()
922 if err != nil {
923 return fmt.Errorf("failed to start transaction: %w", err)
924 }
925 defer tx.Rollback()
926
927 _, err = db.AddIssueComment(tx, *comment)
928 if err != nil {
929 return fmt.Errorf("failed to create issue comment: %w", err)
930 }
931
932 return tx.Commit()
933
934 case jmodels.CommitOperationDelete:
935 if err := db.DeleteIssueComments(
936 ddb,
937 orm.FilterEq("did", did),
938 orm.FilterEq("rkey", rkey),
939 ); err != nil {
940 return fmt.Errorf("failed to delete issue comment record: %w", err)
941 }
942
943 return nil
944 }
945
946 return nil
947}
948
949func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
950 did := e.Did
951 rkey := e.Commit.RKey
952
953 var err error
954
955 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
956 l.Info("ingesting record")
957
958 ddb, ok := i.Db.Execer.(*db.DB)
959 if !ok {
960 return fmt.Errorf("failed to index label definition, invalid db cast")
961 }
962
963 switch e.Commit.Operation {
964 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
965 raw := json.RawMessage(e.Commit.Record)
966 record := tangled.LabelDefinition{}
967 err = json.Unmarshal(raw, &record)
968 if err != nil {
969 return fmt.Errorf("invalid record: %w", err)
970 }
971
972 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
973 if err != nil {
974 return fmt.Errorf("failed to parse labeldef from record: %w", err)
975 }
976
977 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
978 return fmt.Errorf("failed to validate labeldef: %w", err)
979 }
980
981 _, err = db.AddLabelDefinition(ddb, def)
982 if err != nil {
983 return fmt.Errorf("failed to create labeldef: %w", err)
984 }
985
986 return nil
987
988 case jmodels.CommitOperationDelete:
989 if err := db.DeleteLabelDefinition(
990 ddb,
991 orm.FilterEq("did", did),
992 orm.FilterEq("rkey", rkey),
993 ); err != nil {
994 return fmt.Errorf("failed to delete labeldef record: %w", err)
995 }
996
997 return nil
998 }
999
1000 return nil
1001}
1002
1003func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1004 did := e.Did
1005 rkey := e.Commit.RKey
1006
1007 var err error
1008
1009 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1010 l.Info("ingesting record")
1011
1012 ddb, ok := i.Db.Execer.(*db.DB)
1013 if !ok {
1014 return fmt.Errorf("failed to index label op, invalid db cast")
1015 }
1016
1017 switch e.Commit.Operation {
1018 case jmodels.CommitOperationCreate:
1019 raw := json.RawMessage(e.Commit.Record)
1020 record := tangled.LabelOp{}
1021 err = json.Unmarshal(raw, &record)
1022 if err != nil {
1023 return fmt.Errorf("invalid record: %w", err)
1024 }
1025
1026 subject := syntax.ATURI(record.Subject)
1027 collection := subject.Collection()
1028
1029 var repo *models.Repo
1030 switch collection {
1031 case tangled.RepoIssueNSID:
1032 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1033 if err != nil || len(i) != 1 {
1034 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1035 }
1036 repo = i[0].Repo
1037 default:
1038 return fmt.Errorf("unsupport label subject: %s", collection)
1039 }
1040
1041 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1042 if err != nil {
1043 return fmt.Errorf("failed to build label application ctx: %w", err)
1044 }
1045
1046 ops := models.LabelOpsFromRecord(did, rkey, record)
1047
1048 for _, o := range ops {
1049 def, ok := actx.Defs[o.OperandKey]
1050 if !ok {
1051 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1052 }
1053 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1054 return fmt.Errorf("failed to validate labelop: %w", err)
1055 }
1056 }
1057
1058 tx, err := ddb.Begin()
1059 if err != nil {
1060 return err
1061 }
1062 defer tx.Rollback()
1063
1064 for _, o := range ops {
1065 _, err = db.AddLabelOp(tx, &o)
1066 if err != nil {
1067 return fmt.Errorf("failed to add labelop: %w", err)
1068 }
1069 }
1070
1071 if err = tx.Commit(); err != nil {
1072 return err
1073 }
1074 }
1075
1076 return nil
1077}