forked from
tangled.org/core
this repo has no description
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/appview/validator"
23 "tangled.org/core/idresolver"
24 "tangled.org/core/orm"
25 "tangled.org/core/rbac"
26)
27
28type Ingester struct {
29 Db db.DbWrapper
30 Enforcer *rbac.Enforcer
31 IdResolver *idresolver.Resolver
32 Config *config.Config
33 Logger *slog.Logger
34 Validator *validator.Validator
35}
36
37type processFunc func(ctx context.Context, e *jmodels.Event) error
38
39func (i *Ingester) Ingest() processFunc {
40 return func(ctx context.Context, e *jmodels.Event) error {
41 var err error
42 defer func() {
43 eventTime := e.TimeUS
44 lastTimeUs := eventTime + 1
45 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
46 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
47 }
48 }()
49
50 l := i.Logger.With("kind", e.Kind)
51 switch e.Kind {
52 case jmodels.EventKindAccount:
53 if !e.Account.Active && *e.Account.Status == "deactivated" {
54 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
55 }
56 case jmodels.EventKindIdentity:
57 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
58 case jmodels.EventKindCommit:
59 switch e.Commit.Collection {
60 case tangled.GraphFollowNSID:
61 err = i.ingestFollow(e)
62 case tangled.FeedStarNSID:
63 err = i.ingestStar(e)
64 case tangled.PublicKeyNSID:
65 err = i.ingestPublicKey(e)
66 case tangled.RepoArtifactNSID:
67 err = i.ingestArtifact(e)
68 case tangled.ActorProfileNSID:
69 err = i.ingestProfile(e)
70 case tangled.SpindleMemberNSID:
71 err = i.ingestSpindleMember(ctx, e)
72 case tangled.SpindleNSID:
73 err = i.ingestSpindle(ctx, e)
74 case tangled.KnotMemberNSID:
75 err = i.ingestKnotMember(e)
76 case tangled.KnotNSID:
77 err = i.ingestKnot(e)
78 case tangled.StringNSID:
79 err = i.ingestString(e)
80 case tangled.RepoIssueNSID:
81 err = i.ingestIssue(ctx, e)
82 case tangled.CommentNSID:
83 err = i.ingestComment(e)
84 case tangled.RepoIssueCommentNSID:
85 err = i.ingestIssueComment(e)
86 case tangled.LabelDefinitionNSID:
87 err = i.ingestLabelDefinition(e)
88 case tangled.LabelOpNSID:
89 err = i.ingestLabelOp(e)
90 }
91 l = i.Logger.With("nsid", e.Commit.Collection)
92 }
93
94 if err != nil {
95 l.Warn("refused to ingest record", "err", err)
96 }
97
98 return nil
99 }
100}
101
102func (i *Ingester) ingestStar(e *jmodels.Event) error {
103 var err error
104 did := e.Did
105
106 l := i.Logger.With("handler", "ingestStar")
107 l = l.With("nsid", e.Commit.Collection)
108
109 switch e.Commit.Operation {
110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
111 var subjectUri syntax.ATURI
112
113 raw := json.RawMessage(e.Commit.Record)
114 record := tangled.FeedStar{}
115 err := json.Unmarshal(raw, &record)
116 if err != nil {
117 l.Error("invalid record", "err", err)
118 return err
119 }
120
121 subjectUri, err = syntax.ParseATURI(record.Subject)
122 if err != nil {
123 l.Error("invalid record", "err", err)
124 return err
125 }
126 err = db.AddStar(i.Db, &models.Star{
127 Did: did,
128 RepoAt: subjectUri,
129 Rkey: e.Commit.RKey,
130 })
131 case jmodels.CommitOperationDelete:
132 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
133 }
134
135 if err != nil {
136 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
137 }
138
139 return nil
140}
141
142func (i *Ingester) ingestFollow(e *jmodels.Event) error {
143 var err error
144 did := e.Did
145
146 l := i.Logger.With("handler", "ingestFollow")
147 l = l.With("nsid", e.Commit.Collection)
148
149 switch e.Commit.Operation {
150 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
151 raw := json.RawMessage(e.Commit.Record)
152 record := tangled.GraphFollow{}
153 err = json.Unmarshal(raw, &record)
154 if err != nil {
155 l.Error("invalid record", "err", err)
156 return err
157 }
158
159 err = db.AddFollow(i.Db, &models.Follow{
160 UserDid: did,
161 SubjectDid: record.Subject,
162 Rkey: e.Commit.RKey,
163 })
164 case jmodels.CommitOperationDelete:
165 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
166 }
167
168 if err != nil {
169 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
170 }
171
172 return nil
173}
174
175func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
176 did := e.Did
177 var err error
178
179 l := i.Logger.With("handler", "ingestPublicKey")
180 l = l.With("nsid", e.Commit.Collection)
181
182 switch e.Commit.Operation {
183 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
184 l.Debug("processing add of pubkey")
185 raw := json.RawMessage(e.Commit.Record)
186 record := tangled.PublicKey{}
187 err = json.Unmarshal(raw, &record)
188 if err != nil {
189 l.Error("invalid record", "err", err)
190 return err
191 }
192
193 name := record.Name
194 key := record.Key
195 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
196 case jmodels.CommitOperationDelete:
197 l.Debug("processing delete of pubkey")
198 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
199 }
200
201 if err != nil {
202 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
203 }
204
205 return nil
206}
207
208func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
209 did := e.Did
210 var err error
211
212 l := i.Logger.With("handler", "ingestArtifact")
213 l = l.With("nsid", e.Commit.Collection)
214
215 switch e.Commit.Operation {
216 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
217 raw := json.RawMessage(e.Commit.Record)
218 record := tangled.RepoArtifact{}
219 err = json.Unmarshal(raw, &record)
220 if err != nil {
221 l.Error("invalid record", "err", err)
222 return err
223 }
224
225 repoAt, err := syntax.ParseATURI(record.Repo)
226 if err != nil {
227 return err
228 }
229
230 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
231 if err != nil {
232 return err
233 }
234
235 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
236 if err != nil || !ok {
237 return err
238 }
239
240 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
241 if err != nil {
242 createdAt = time.Now()
243 }
244
245 artifact := models.Artifact{
246 Did: did,
247 Rkey: e.Commit.RKey,
248 RepoAt: repoAt,
249 Tag: plumbing.Hash(record.Tag),
250 CreatedAt: createdAt,
251 BlobCid: cid.Cid(record.Artifact.Ref),
252 Name: record.Name,
253 Size: uint64(record.Artifact.Size),
254 MimeType: record.Artifact.MimeType,
255 }
256
257 err = db.AddArtifact(i.Db, artifact)
258 case jmodels.CommitOperationDelete:
259 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
260 }
261
262 if err != nil {
263 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
264 }
265
266 return nil
267}
268
269func (i *Ingester) ingestProfile(e *jmodels.Event) error {
270 did := e.Did
271 var err error
272
273 l := i.Logger.With("handler", "ingestProfile")
274 l = l.With("nsid", e.Commit.Collection)
275
276 if e.Commit.RKey != "self" {
277 return fmt.Errorf("ingestProfile only ingests `self` record")
278 }
279
280 switch e.Commit.Operation {
281 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
282 raw := json.RawMessage(e.Commit.Record)
283 record := tangled.ActorProfile{}
284 err = json.Unmarshal(raw, &record)
285 if err != nil {
286 l.Error("invalid record", "err", err)
287 return err
288 }
289
290 avatar := ""
291 if record.Avatar != nil {
292 avatar = record.Avatar.Ref.String()
293 }
294
295 description := ""
296 if record.Description != nil {
297 description = *record.Description
298 }
299
300 includeBluesky := record.Bluesky
301
302 pronouns := ""
303 if record.Pronouns != nil {
304 pronouns = *record.Pronouns
305 }
306
307 location := ""
308 if record.Location != nil {
309 location = *record.Location
310 }
311
312 var links [5]string
313 for i, l := range record.Links {
314 if i < 5 {
315 links[i] = l
316 }
317 }
318
319 var stats [2]models.VanityStat
320 for i, s := range record.Stats {
321 if i < 2 {
322 stats[i].Kind = models.VanityStatKind(s)
323 }
324 }
325
326 var pinned [6]syntax.ATURI
327 for i, r := range record.PinnedRepositories {
328 if i < 6 {
329 pinned[i] = syntax.ATURI(r)
330 }
331 }
332
333 profile := models.Profile{
334 Did: did,
335 Avatar: avatar,
336 Description: description,
337 IncludeBluesky: includeBluesky,
338 Location: location,
339 Links: links,
340 Stats: stats,
341 PinnedRepos: pinned,
342 Pronouns: pronouns,
343 }
344
345 ddb, ok := i.Db.Execer.(*db.DB)
346 if !ok {
347 return fmt.Errorf("failed to index profile record, invalid db cast")
348 }
349
350 tx, err := ddb.Begin()
351 if err != nil {
352 return fmt.Errorf("failed to start transaction")
353 }
354
355 err = db.ValidateProfile(tx, &profile)
356 if err != nil {
357 return fmt.Errorf("invalid profile record")
358 }
359
360 err = db.UpsertProfile(tx, &profile)
361 case jmodels.CommitOperationDelete:
362 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
363 }
364
365 if err != nil {
366 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
367 }
368
369 return nil
370}
371
372func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
373 did := e.Did
374 var err error
375
376 l := i.Logger.With("handler", "ingestSpindleMember")
377 l = l.With("nsid", e.Commit.Collection)
378
379 switch e.Commit.Operation {
380 case jmodels.CommitOperationCreate:
381 raw := json.RawMessage(e.Commit.Record)
382 record := tangled.SpindleMember{}
383 err = json.Unmarshal(raw, &record)
384 if err != nil {
385 l.Error("invalid record", "err", err)
386 return err
387 }
388
389 // only spindle owner can invite to spindles
390 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
391 if err != nil || !ok {
392 return fmt.Errorf("failed to enforce permissions: %w", err)
393 }
394
395 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
396 if err != nil {
397 return err
398 }
399
400 if memberId.Handle.IsInvalidHandle() {
401 return err
402 }
403
404 ddb, ok := i.Db.Execer.(*db.DB)
405 if !ok {
406 return fmt.Errorf("failed to index profile record, invalid db cast")
407 }
408
409 err = db.AddSpindleMember(ddb, models.SpindleMember{
410 Did: syntax.DID(did),
411 Rkey: e.Commit.RKey,
412 Instance: record.Instance,
413 Subject: memberId.DID,
414 })
415 if !ok {
416 return fmt.Errorf("failed to add to db: %w", err)
417 }
418
419 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
420 if err != nil {
421 return fmt.Errorf("failed to update ACLs: %w", err)
422 }
423
424 l.Info("added spindle member")
425 case jmodels.CommitOperationDelete:
426 rkey := e.Commit.RKey
427
428 ddb, ok := i.Db.Execer.(*db.DB)
429 if !ok {
430 return fmt.Errorf("failed to index profile record, invalid db cast")
431 }
432
433 // get record from db first
434 members, err := db.GetSpindleMembers(
435 ddb,
436 orm.FilterEq("did", did),
437 orm.FilterEq("rkey", rkey),
438 )
439 if err != nil || len(members) != 1 {
440 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
441 }
442 member := members[0]
443
444 tx, err := ddb.Begin()
445 if err != nil {
446 return fmt.Errorf("failed to start txn: %w", err)
447 }
448
449 // remove record by rkey && update enforcer
450 if err = db.RemoveSpindleMember(
451 tx,
452 orm.FilterEq("did", did),
453 orm.FilterEq("rkey", rkey),
454 ); err != nil {
455 return fmt.Errorf("failed to remove from db: %w", err)
456 }
457
458 // update enforcer
459 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
460 if err != nil {
461 return fmt.Errorf("failed to update ACLs: %w", err)
462 }
463
464 if err = tx.Commit(); err != nil {
465 return fmt.Errorf("failed to commit txn: %w", err)
466 }
467
468 if err = i.Enforcer.E.SavePolicy(); err != nil {
469 return fmt.Errorf("failed to save ACLs: %w", err)
470 }
471
472 l.Info("removed spindle member")
473 }
474
475 return nil
476}
477
478func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
479 did := e.Did
480 var err error
481
482 l := i.Logger.With("handler", "ingestSpindle")
483 l = l.With("nsid", e.Commit.Collection)
484
485 switch e.Commit.Operation {
486 case jmodels.CommitOperationCreate:
487 raw := json.RawMessage(e.Commit.Record)
488 record := tangled.Spindle{}
489 err = json.Unmarshal(raw, &record)
490 if err != nil {
491 l.Error("invalid record", "err", err)
492 return err
493 }
494
495 instance := e.Commit.RKey
496
497 ddb, ok := i.Db.Execer.(*db.DB)
498 if !ok {
499 return fmt.Errorf("failed to index profile record, invalid db cast")
500 }
501
502 err := db.AddSpindle(ddb, models.Spindle{
503 Owner: syntax.DID(did),
504 Instance: instance,
505 })
506 if err != nil {
507 l.Error("failed to add spindle to db", "err", err, "instance", instance)
508 return err
509 }
510
511 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
512 if err != nil {
513 l.Error("failed to add spindle to db", "err", err, "instance", instance)
514 return err
515 }
516
517 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
518 if err != nil {
519 return fmt.Errorf("failed to mark verified: %w", err)
520 }
521
522 return nil
523
524 case jmodels.CommitOperationDelete:
525 instance := e.Commit.RKey
526
527 ddb, ok := i.Db.Execer.(*db.DB)
528 if !ok {
529 return fmt.Errorf("failed to index profile record, invalid db cast")
530 }
531
532 // get record from db first
533 spindles, err := db.GetSpindles(
534 ddb,
535 orm.FilterEq("owner", did),
536 orm.FilterEq("instance", instance),
537 )
538 if err != nil || len(spindles) != 1 {
539 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
540 }
541 spindle := spindles[0]
542
543 tx, err := ddb.Begin()
544 if err != nil {
545 return err
546 }
547 defer func() {
548 tx.Rollback()
549 i.Enforcer.E.LoadPolicy()
550 }()
551
552 // remove spindle members first
553 err = db.RemoveSpindleMember(
554 tx,
555 orm.FilterEq("owner", did),
556 orm.FilterEq("instance", instance),
557 )
558 if err != nil {
559 return err
560 }
561
562 err = db.DeleteSpindle(
563 tx,
564 orm.FilterEq("owner", did),
565 orm.FilterEq("instance", instance),
566 )
567 if err != nil {
568 return err
569 }
570
571 if spindle.Verified != nil {
572 err = i.Enforcer.RemoveSpindle(instance)
573 if err != nil {
574 return err
575 }
576 }
577
578 err = tx.Commit()
579 if err != nil {
580 return err
581 }
582
583 err = i.Enforcer.E.SavePolicy()
584 if err != nil {
585 return err
586 }
587 }
588
589 return nil
590}
591
592func (i *Ingester) ingestString(e *jmodels.Event) error {
593 did := e.Did
594 rkey := e.Commit.RKey
595
596 var err error
597
598 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
599 l.Info("ingesting record")
600
601 ddb, ok := i.Db.Execer.(*db.DB)
602 if !ok {
603 return fmt.Errorf("failed to index string record, invalid db cast")
604 }
605
606 switch e.Commit.Operation {
607 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
608 raw := json.RawMessage(e.Commit.Record)
609 record := tangled.String{}
610 err = json.Unmarshal(raw, &record)
611 if err != nil {
612 l.Error("invalid record", "err", err)
613 return err
614 }
615
616 string := models.StringFromRecord(did, rkey, record)
617
618 if err = i.Validator.ValidateString(&string); err != nil {
619 l.Error("invalid record", "err", err)
620 return err
621 }
622
623 if err = db.AddString(ddb, string); err != nil {
624 l.Error("failed to add string", "err", err)
625 return err
626 }
627
628 return nil
629
630 case jmodels.CommitOperationDelete:
631 if err := db.DeleteString(
632 ddb,
633 orm.FilterEq("did", did),
634 orm.FilterEq("rkey", rkey),
635 ); err != nil {
636 l.Error("failed to delete", "err", err)
637 return fmt.Errorf("failed to delete string record: %w", err)
638 }
639
640 return nil
641 }
642
643 return nil
644}
645
646func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
647 did := e.Did
648 var err error
649
650 l := i.Logger.With("handler", "ingestKnotMember")
651 l = l.With("nsid", e.Commit.Collection)
652
653 switch e.Commit.Operation {
654 case jmodels.CommitOperationCreate:
655 raw := json.RawMessage(e.Commit.Record)
656 record := tangled.KnotMember{}
657 err = json.Unmarshal(raw, &record)
658 if err != nil {
659 l.Error("invalid record", "err", err)
660 return err
661 }
662
663 // only knot owner can invite to knots
664 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
665 if err != nil || !ok {
666 return fmt.Errorf("failed to enforce permissions: %w", err)
667 }
668
669 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
670 if err != nil {
671 return err
672 }
673
674 if memberId.Handle.IsInvalidHandle() {
675 return err
676 }
677
678 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
679 if err != nil {
680 return fmt.Errorf("failed to update ACLs: %w", err)
681 }
682
683 l.Info("added knot member")
684 case jmodels.CommitOperationDelete:
685 // we don't store knot members in a table (like we do for spindle)
686 // and we can't remove this just yet. possibly fixed if we switch
687 // to either:
688 // 1. a knot_members table like with spindle and store the rkey
689 // 2. use the knot host as the rkey
690 //
691 // TODO: implement member deletion
692 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
693 }
694
695 return nil
696}
697
698func (i *Ingester) ingestKnot(e *jmodels.Event) error {
699 did := e.Did
700 var err error
701
702 l := i.Logger.With("handler", "ingestKnot")
703 l = l.With("nsid", e.Commit.Collection)
704
705 switch e.Commit.Operation {
706 case jmodels.CommitOperationCreate:
707 raw := json.RawMessage(e.Commit.Record)
708 record := tangled.Knot{}
709 err = json.Unmarshal(raw, &record)
710 if err != nil {
711 l.Error("invalid record", "err", err)
712 return err
713 }
714
715 domain := e.Commit.RKey
716
717 ddb, ok := i.Db.Execer.(*db.DB)
718 if !ok {
719 return fmt.Errorf("failed to index profile record, invalid db cast")
720 }
721
722 err := db.AddKnot(ddb, domain, did)
723 if err != nil {
724 l.Error("failed to add knot to db", "err", err, "domain", domain)
725 return err
726 }
727
728 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
729 if err != nil {
730 l.Error("failed to verify knot", "err", err, "domain", domain)
731 return err
732 }
733
734 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
735 if err != nil {
736 return fmt.Errorf("failed to mark verified: %w", err)
737 }
738
739 return nil
740
741 case jmodels.CommitOperationDelete:
742 domain := e.Commit.RKey
743
744 ddb, ok := i.Db.Execer.(*db.DB)
745 if !ok {
746 return fmt.Errorf("failed to index knot record, invalid db cast")
747 }
748
749 // get record from db first
750 registrations, err := db.GetRegistrations(
751 ddb,
752 orm.FilterEq("domain", domain),
753 orm.FilterEq("did", did),
754 )
755 if err != nil {
756 return fmt.Errorf("failed to get registration: %w", err)
757 }
758 if len(registrations) != 1 {
759 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
760 }
761 registration := registrations[0]
762
763 tx, err := ddb.Begin()
764 if err != nil {
765 return err
766 }
767 defer func() {
768 tx.Rollback()
769 i.Enforcer.E.LoadPolicy()
770 }()
771
772 err = db.DeleteKnot(
773 tx,
774 orm.FilterEq("did", did),
775 orm.FilterEq("domain", domain),
776 )
777 if err != nil {
778 return err
779 }
780
781 if registration.Registered != nil {
782 err = i.Enforcer.RemoveKnot(domain)
783 if err != nil {
784 return err
785 }
786 }
787
788 err = tx.Commit()
789 if err != nil {
790 return err
791 }
792
793 err = i.Enforcer.E.SavePolicy()
794 if err != nil {
795 return err
796 }
797 }
798
799 return nil
800}
801func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
802 did := e.Did
803 rkey := e.Commit.RKey
804
805 var err error
806
807 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
808 l.Info("ingesting record")
809
810 ddb, ok := i.Db.Execer.(*db.DB)
811 if !ok {
812 return fmt.Errorf("failed to index issue record, invalid db cast")
813 }
814
815 switch e.Commit.Operation {
816 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
817 raw := json.RawMessage(e.Commit.Record)
818 record := tangled.RepoIssue{}
819 err = json.Unmarshal(raw, &record)
820 if err != nil {
821 l.Error("invalid record", "err", err)
822 return err
823 }
824
825 issue := models.IssueFromRecord(did, rkey, record)
826
827 if err := i.Validator.ValidateIssue(&issue); err != nil {
828 return fmt.Errorf("failed to validate issue: %w", err)
829 }
830
831 tx, err := ddb.BeginTx(ctx, nil)
832 if err != nil {
833 l.Error("failed to begin transaction", "err", err)
834 return err
835 }
836 defer tx.Rollback()
837
838 err = db.PutIssue(tx, &issue)
839 if err != nil {
840 l.Error("failed to create issue", "err", err)
841 return err
842 }
843
844 err = tx.Commit()
845 if err != nil {
846 l.Error("failed to commit txn", "err", err)
847 return err
848 }
849
850 return nil
851
852 case jmodels.CommitOperationDelete:
853 tx, err := ddb.BeginTx(ctx, nil)
854 if err != nil {
855 l.Error("failed to begin transaction", "err", err)
856 return err
857 }
858 defer tx.Rollback()
859
860 if err := db.DeleteIssues(
861 tx,
862 did,
863 rkey,
864 ); err != nil {
865 l.Error("failed to delete", "err", err)
866 return fmt.Errorf("failed to delete issue record: %w", err)
867 }
868 if err := tx.Commit(); err != nil {
869 l.Error("failed to commit txn", "err", err)
870 return err
871 }
872
873 return nil
874 }
875
876 return nil
877}
878
879func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
880 did := e.Did
881 rkey := e.Commit.RKey
882
883 var err error
884
885 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
886 l.Info("ingesting record")
887
888 ddb, ok := i.Db.Execer.(*db.DB)
889 if !ok {
890 return fmt.Errorf("failed to index issue comment record, invalid db cast")
891 }
892
893 switch e.Commit.Operation {
894 case jmodels.CommitOperationUpdate:
895 raw := json.RawMessage(e.Commit.Record)
896 record := tangled.RepoIssueComment{}
897 err = json.Unmarshal(raw, &record)
898 if err != nil {
899 return fmt.Errorf("invalid record: %w", err)
900 }
901
902 // convert 'sh.tangled.repo.issue.comment' to 'sh.tangled.comment'
903 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), tangled.Comment{
904 Body: record.Body,
905 CreatedAt: record.CreatedAt,
906 Mentions: record.Mentions,
907 References: record.References,
908 ReplyTo: record.ReplyTo,
909 Subject: record.Issue,
910 })
911 if err != nil {
912 return fmt.Errorf("failed to parse comment from record: %w", err)
913 }
914
915 if err := comment.Validate(); err != nil {
916 return fmt.Errorf("failed to validate comment: %w", err)
917 }
918
919 tx, err := ddb.Begin()
920 if err != nil {
921 return fmt.Errorf("failed to start transaction: %w", err)
922 }
923 defer tx.Rollback()
924
925 err = db.PutComment(tx, comment)
926 if err != nil {
927 return fmt.Errorf("failed to create comment: %w", err)
928 }
929
930 return tx.Commit()
931
932 case jmodels.CommitOperationDelete:
933 if err := db.DeleteComments(
934 ddb,
935 orm.FilterEq("did", did),
936 orm.FilterEq("collection", e.Commit.Collection),
937 orm.FilterEq("rkey", rkey),
938 ); err != nil {
939 return fmt.Errorf("failed to delete issue comment record: %w", err)
940 }
941
942 return nil
943 }
944
945 return nil
946}
947
948func (i *Ingester) ingestComment(e *jmodels.Event) error {
949 did := e.Did
950 rkey := e.Commit.RKey
951
952 var err error
953
954 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
955 l.Info("ingesting record")
956
957 ddb, ok := i.Db.Execer.(*db.DB)
958 if !ok {
959 return fmt.Errorf("failed to index issue comment record, invalid db cast")
960 }
961
962 switch e.Commit.Operation {
963 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
964 raw := json.RawMessage(e.Commit.Record)
965 record := tangled.Comment{}
966 err = json.Unmarshal(raw, &record)
967 if err != nil {
968 return fmt.Errorf("invalid record: %w", err)
969 }
970
971 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), record)
972 if err != nil {
973 return fmt.Errorf("failed to parse comment from record: %w", err)
974 }
975
976 // TODO: ingest pull comments
977 // we aren't ingesting pull comments yet because pull itself isn't fully atprotated.
978 // so we cannot know which round this comment is pointing to
979 if comment.Subject.Collection().String() == tangled.RepoPullNSID {
980 l.Info("skip ingesting pull comments")
981 return nil
982 }
983
984 if err := comment.Validate(); err != nil {
985 return fmt.Errorf("failed to validate comment: %w", err)
986 }
987
988 tx, err := ddb.Begin()
989 if err != nil {
990 return fmt.Errorf("failed to start transaction: %w", err)
991 }
992 defer tx.Rollback()
993
994 err = db.PutComment(tx, comment)
995 if err != nil {
996 return fmt.Errorf("failed to create comment: %w", err)
997 }
998
999 return tx.Commit()
1000
1001 case jmodels.CommitOperationDelete:
1002 if err := db.DeleteComments(
1003 ddb,
1004 orm.FilterEq("did", did),
1005 orm.FilterEq("collection", e.Commit.Collection),
1006 orm.FilterEq("rkey", rkey),
1007 ); err != nil {
1008 return fmt.Errorf("failed to delete comment record: %w", err)
1009 }
1010
1011 return nil
1012 }
1013
1014 return nil
1015}
1016
1017func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1018 did := e.Did
1019 rkey := e.Commit.RKey
1020
1021 var err error
1022
1023 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1024 l.Info("ingesting record")
1025
1026 ddb, ok := i.Db.Execer.(*db.DB)
1027 if !ok {
1028 return fmt.Errorf("failed to index label definition, invalid db cast")
1029 }
1030
1031 switch e.Commit.Operation {
1032 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1033 raw := json.RawMessage(e.Commit.Record)
1034 record := tangled.LabelDefinition{}
1035 err = json.Unmarshal(raw, &record)
1036 if err != nil {
1037 return fmt.Errorf("invalid record: %w", err)
1038 }
1039
1040 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1041 if err != nil {
1042 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1043 }
1044
1045 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1046 return fmt.Errorf("failed to validate labeldef: %w", err)
1047 }
1048
1049 _, err = db.AddLabelDefinition(ddb, def)
1050 if err != nil {
1051 return fmt.Errorf("failed to create labeldef: %w", err)
1052 }
1053
1054 return nil
1055
1056 case jmodels.CommitOperationDelete:
1057 if err := db.DeleteLabelDefinition(
1058 ddb,
1059 orm.FilterEq("did", did),
1060 orm.FilterEq("rkey", rkey),
1061 ); err != nil {
1062 return fmt.Errorf("failed to delete labeldef record: %w", err)
1063 }
1064
1065 return nil
1066 }
1067
1068 return nil
1069}
1070
1071func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1072 did := e.Did
1073 rkey := e.Commit.RKey
1074
1075 var err error
1076
1077 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1078 l.Info("ingesting record")
1079
1080 ddb, ok := i.Db.Execer.(*db.DB)
1081 if !ok {
1082 return fmt.Errorf("failed to index label op, invalid db cast")
1083 }
1084
1085 switch e.Commit.Operation {
1086 case jmodels.CommitOperationCreate:
1087 raw := json.RawMessage(e.Commit.Record)
1088 record := tangled.LabelOp{}
1089 err = json.Unmarshal(raw, &record)
1090 if err != nil {
1091 return fmt.Errorf("invalid record: %w", err)
1092 }
1093
1094 subject := syntax.ATURI(record.Subject)
1095 collection := subject.Collection()
1096
1097 var repo *models.Repo
1098 switch collection {
1099 case tangled.RepoIssueNSID:
1100 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1101 if err != nil || len(i) != 1 {
1102 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1103 }
1104 repo = i[0].Repo
1105 default:
1106 return fmt.Errorf("unsupport label subject: %s", collection)
1107 }
1108
1109 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1110 if err != nil {
1111 return fmt.Errorf("failed to build label application ctx: %w", err)
1112 }
1113
1114 ops := models.LabelOpsFromRecord(did, rkey, record)
1115
1116 for _, o := range ops {
1117 def, ok := actx.Defs[o.OperandKey]
1118 if !ok {
1119 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1120 }
1121 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1122 return fmt.Errorf("failed to validate labelop: %w", err)
1123 }
1124 }
1125
1126 tx, err := ddb.Begin()
1127 if err != nil {
1128 return err
1129 }
1130 defer tx.Rollback()
1131
1132 for _, o := range ops {
1133 _, err = db.AddLabelOp(tx, &o)
1134 if err != nil {
1135 return fmt.Errorf("failed to add labelop: %w", err)
1136 }
1137 }
1138
1139 if err = tx.Commit(); err != nil {
1140 return err
1141 }
1142 }
1143
1144 return nil
1145}