Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "sync"
16
17 "time"
18
19 "github.com/avast/retry-go/v4"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 jmodels "github.com/bluesky-social/jetstream/pkg/models"
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/ipfs/go-cid"
24 "golang.org/x/sync/errgroup"
25 "tangled.org/core/api/tangled"
26 "tangled.org/core/appview/config"
27 "tangled.org/core/appview/db"
28 "tangled.org/core/appview/mentions"
29 "tangled.org/core/appview/models"
30 "tangled.org/core/appview/notify"
31 "tangled.org/core/appview/serververify"
32 "tangled.org/core/appview/validator"
33 "tangled.org/core/idresolver"
34 "tangled.org/core/orm"
35 "tangled.org/core/rbac"
36)
37
38type Ingester struct {
39 Db db.DbWrapper
40 Enforcer *rbac.Enforcer
41 IdResolver *idresolver.Resolver
42 Config *config.Config
43 Logger *slog.Logger
44 Validator *validator.Validator
45 MentionsResolver *mentions.Resolver
46 Notifier notify.Notifier
47}
48
49type processFunc func(ctx context.Context, e *jmodels.Event) error
50
51func (i *Ingester) Ingest() processFunc {
52 return func(ctx context.Context, e *jmodels.Event) error {
53 var err error
54
55 l := i.Logger.With("kind", e.Kind)
56 switch e.Kind {
57 case jmodels.EventKindAccount:
58 // TODO: sync account state to db
59 if e.Account.Active {
60 break
61 }
62 // TODO: revoke sessions by DID
63 if *e.Account.Status == "deactivated" {
64 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
65 }
66 case jmodels.EventKindIdentity:
67 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
68 case jmodels.EventKindCommit:
69 switch e.Commit.Collection {
70 case tangled.GraphFollowNSID:
71 err = i.ingestFollow(e)
72 case tangled.FeedStarNSID:
73 err = i.ingestStar(e)
74 case tangled.PublicKeyNSID:
75 err = i.ingestPublicKey(e)
76 case tangled.RepoArtifactNSID:
77 err = i.ingestArtifact(e)
78 case tangled.ActorProfileNSID:
79 err = i.ingestProfile(ctx, e)
80 case tangled.SpindleMemberNSID:
81 err = i.ingestSpindleMember(ctx, e)
82 case tangled.SpindleNSID:
83 err = i.ingestSpindle(ctx, e)
84 case tangled.KnotMemberNSID:
85 err = i.ingestKnotMember(e)
86 case tangled.KnotNSID:
87 err = i.ingestKnot(e)
88 case tangled.StringNSID:
89 err = i.ingestString(e)
90 case tangled.RepoIssueNSID:
91 err = i.ingestIssue(ctx, e)
92 case tangled.RepoPullNSID:
93 err = i.ingestPull(ctx, e)
94 case tangled.FeedCommentNSID:
95 err = i.ingestComment(e)
96 case tangled.RepoIssueCommentNSID:
97 err = i.ingestIssueComment(e)
98 case tangled.RepoPullCommentNSID:
99 err = i.ingestPullComment(e)
100 case tangled.LabelDefinitionNSID:
101 err = i.ingestLabelDefinition(e)
102 case tangled.LabelOpNSID:
103 err = i.ingestLabelOp(e)
104 }
105 l = i.Logger.With("nsid", e.Commit.Collection)
106 }
107
108 if err != nil {
109 l.Warn("failed to ingest record, skipping", "err", err)
110 }
111
112 lastTimeUs := e.TimeUS + 1
113 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
114 l.Error("failed to save cursor", "err", saveErr)
115 }
116
117 return nil
118 }
119}
120
121func (i *Ingester) ingestStar(e *jmodels.Event) error {
122 var err error
123 did := e.Did
124
125 l := i.Logger.With("handler", "ingestStar")
126 l = l.With("nsid", e.Commit.Collection)
127
128 switch e.Commit.Operation {
129 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
130 var subjectUri syntax.ATURI
131
132 raw := json.RawMessage(e.Commit.Record)
133 record := tangled.FeedStar{}
134 err := json.Unmarshal(raw, &record)
135 if err != nil {
136 l.Error("invalid record", "err", err)
137 return err
138 }
139
140 star := &models.Star{
141 Did: did,
142 Rkey: e.Commit.RKey,
143 }
144
145 switch {
146 case record.SubjectDid != nil:
147 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
148 if repoErr == nil {
149 subjectUri = repo.RepoAt()
150 star.RepoAt = subjectUri
151 }
152 case record.Subject != nil:
153 subjectUri, err = syntax.ParseATURI(*record.Subject)
154 if err != nil {
155 l.Error("invalid record", "err", err)
156 return err
157 }
158 star.RepoAt = subjectUri
159 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
160 if repoErr == nil && repo.RepoDid != "" {
161 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
162 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
163 }
164 }
165 default:
166 l.Error("star record has neither subject nor subjectDid")
167 return fmt.Errorf("star record has neither subject nor subjectDid")
168 }
169 err = db.AddStar(i.Db, star)
170 case jmodels.CommitOperationDelete:
171 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
172 }
173
174 if err != nil {
175 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
176 }
177
178 return nil
179}
180
181func (i *Ingester) ingestFollow(e *jmodels.Event) error {
182 var err error
183 did := e.Did
184
185 l := i.Logger.With("handler", "ingestFollow")
186 l = l.With("nsid", e.Commit.Collection)
187
188 switch e.Commit.Operation {
189 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
190 raw := json.RawMessage(e.Commit.Record)
191 record := tangled.GraphFollow{}
192 err = json.Unmarshal(raw, &record)
193 if err != nil {
194 l.Error("invalid record", "err", err)
195 return err
196 }
197
198 err = db.AddFollow(i.Db, &models.Follow{
199 UserDid: did,
200 SubjectDid: record.Subject,
201 Rkey: e.Commit.RKey,
202 })
203 case jmodels.CommitOperationDelete:
204 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
205 }
206
207 if err != nil {
208 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
209 }
210
211 return nil
212}
213
214func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
215 did := e.Did
216 var err error
217
218 l := i.Logger.With("handler", "ingestPublicKey")
219 l = l.With("nsid", e.Commit.Collection)
220
221 switch e.Commit.Operation {
222 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
223 l.Debug("processing add of pubkey")
224 raw := json.RawMessage(e.Commit.Record)
225 record := tangled.PublicKey{}
226 err = json.Unmarshal(raw, &record)
227 if err != nil {
228 l.Error("invalid record", "err", err)
229 return err
230 }
231
232 name := record.Name
233 key := record.Key
234 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
235 case jmodels.CommitOperationDelete:
236 l.Debug("processing delete of pubkey")
237 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
238 }
239
240 if err != nil {
241 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
242 }
243
244 return nil
245}
246
247func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
248 did := e.Did
249 var err error
250
251 l := i.Logger.With("handler", "ingestArtifact")
252 l = l.With("nsid", e.Commit.Collection)
253
254 switch e.Commit.Operation {
255 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
256 raw := json.RawMessage(e.Commit.Record)
257 record := tangled.RepoArtifact{}
258 err = json.Unmarshal(raw, &record)
259 if err != nil {
260 l.Error("invalid record", "err", err)
261 return err
262 }
263
264 var repo *models.Repo
265 if record.RepoDid != nil && *record.RepoDid != "" {
266 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
267 if err != nil && !errors.Is(err, sql.ErrNoRows) {
268 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
269 }
270 }
271 if repo == nil && record.Repo != nil {
272 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
273 if parseErr != nil {
274 return parseErr
275 }
276 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
277 if err != nil {
278 return err
279 }
280 }
281 if repo == nil {
282 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
283 }
284
285 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
286 if err != nil || !ok {
287 return err
288 }
289
290 repoDid := repo.RepoDid
291 if repoDid == "" && record.RepoDid != nil {
292 repoDid = *record.RepoDid
293 }
294 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
295 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
296 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
297 }
298 }
299
300 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
301 if err != nil {
302 createdAt = time.Now()
303 }
304
305 artifact := models.Artifact{
306 Did: did,
307 Rkey: e.Commit.RKey,
308 RepoAt: repo.RepoAt(),
309 Tag: plumbing.Hash(record.Tag),
310 CreatedAt: createdAt,
311 BlobCid: cid.Cid(record.Artifact.Ref),
312 Name: record.Name,
313 Size: uint64(record.Artifact.Size),
314 MimeType: record.Artifact.MimeType,
315 }
316
317 err = db.AddArtifact(i.Db, artifact)
318 case jmodels.CommitOperationDelete:
319 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
320 }
321
322 if err != nil {
323 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
324 }
325
326 return nil
327}
328
329func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
330 did := e.Did
331 var err error
332
333 l := i.Logger.With("handler", "ingestProfile")
334 l = l.With("nsid", e.Commit.Collection)
335
336 if e.Commit.RKey != "self" {
337 return fmt.Errorf("ingestProfile only ingests `self` record")
338 }
339
340 switch e.Commit.Operation {
341 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
342 raw := json.RawMessage(e.Commit.Record)
343 record := tangled.ActorProfile{}
344 err = json.Unmarshal(raw, &record)
345 if err != nil {
346 l.Error("invalid record", "err", err)
347 return err
348 }
349
350 avatar := ""
351 if record.Avatar != nil {
352 avatar = record.Avatar.Ref.String()
353 }
354
355 description := ""
356 if record.Description != nil {
357 description = *record.Description
358 }
359
360 includeBluesky := record.Bluesky
361
362 pronouns := ""
363 if record.Pronouns != nil {
364 pronouns = *record.Pronouns
365 }
366
367 location := ""
368 if record.Location != nil {
369 location = *record.Location
370 }
371
372 var links [5]string
373 for i, l := range record.Links {
374 if i < 5 {
375 links[i] = l
376 }
377 }
378
379 var stats [2]models.VanityStat
380 for i, s := range record.Stats {
381 if i < 2 {
382 stats[i].Kind = models.ParseVanityStatKind(s)
383 }
384 }
385
386 var pinned [6]string
387 for i, r := range record.PinnedRepositories {
388 if i < 6 {
389 pinned[i] = r
390 }
391 }
392
393 var preferredHandle syntax.Handle
394 if record.PreferredHandle != nil {
395 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
396 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
397 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
398 preferredHandle = h
399 }
400 }
401 }
402
403 profile := models.Profile{
404 Did: did,
405 Avatar: avatar,
406 Description: description,
407 IncludeBluesky: includeBluesky,
408 Location: location,
409 Links: links,
410 Stats: stats,
411 PinnedRepos: pinned,
412 Pronouns: pronouns,
413 PreferredHandle: preferredHandle,
414 }
415
416 ddb, ok := i.Db.Execer.(*db.DB)
417 if !ok {
418 return fmt.Errorf("failed to index profile record, invalid db cast")
419 }
420
421 tx, err := ddb.Begin()
422 if err != nil {
423 return fmt.Errorf("failed to start transaction")
424 }
425
426 err = db.ValidateProfile(tx, &profile)
427 if err != nil {
428 return fmt.Errorf("invalid profile record")
429 }
430
431 err = db.UpsertProfile(tx, &profile)
432 case jmodels.CommitOperationDelete:
433 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
434 }
435
436 if err != nil {
437 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
438 }
439
440 return nil
441}
442
443func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
444 did := e.Did
445 var err error
446
447 l := i.Logger.With("handler", "ingestSpindleMember")
448 l = l.With("nsid", e.Commit.Collection)
449
450 switch e.Commit.Operation {
451 case jmodels.CommitOperationCreate:
452 raw := json.RawMessage(e.Commit.Record)
453 record := tangled.SpindleMember{}
454 err = json.Unmarshal(raw, &record)
455 if err != nil {
456 l.Error("invalid record", "err", err)
457 return err
458 }
459
460 // only spindle owner can invite to spindles
461 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
462 if err != nil || !ok {
463 return fmt.Errorf("failed to enforce permissions: %w", err)
464 }
465
466 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
467 if err != nil {
468 return err
469 }
470
471 if memberId.Handle.IsInvalidHandle() {
472 return err
473 }
474
475 ddb, ok := i.Db.Execer.(*db.DB)
476 if !ok {
477 return fmt.Errorf("invalid db cast")
478 }
479
480 err = db.AddSpindleMember(ddb, models.SpindleMember{
481 Did: syntax.DID(did),
482 Rkey: e.Commit.RKey,
483 Instance: record.Instance,
484 Subject: memberId.DID,
485 })
486 if !ok {
487 return fmt.Errorf("failed to add to db: %w", err)
488 }
489
490 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
491 if err != nil {
492 return fmt.Errorf("failed to update ACLs: %w", err)
493 }
494
495 l.Info("added spindle member")
496 case jmodels.CommitOperationDelete:
497 rkey := e.Commit.RKey
498
499 ddb, ok := i.Db.Execer.(*db.DB)
500 if !ok {
501 return fmt.Errorf("failed to index profile record, invalid db cast")
502 }
503
504 // get record from db first
505 members, err := db.GetSpindleMembers(
506 ddb,
507 orm.FilterEq("did", did),
508 orm.FilterEq("rkey", rkey),
509 )
510 if err != nil || len(members) != 1 {
511 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
512 }
513 member := members[0]
514
515 tx, err := ddb.Begin()
516 if err != nil {
517 return fmt.Errorf("failed to start txn: %w", err)
518 }
519
520 // remove record by rkey && update enforcer
521 if err = db.RemoveSpindleMember(
522 tx,
523 orm.FilterEq("did", did),
524 orm.FilterEq("rkey", rkey),
525 ); err != nil {
526 return fmt.Errorf("failed to remove from db: %w", err)
527 }
528
529 // update enforcer
530 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
531 if err != nil {
532 return fmt.Errorf("failed to update ACLs: %w", err)
533 }
534
535 if err = tx.Commit(); err != nil {
536 return fmt.Errorf("failed to commit txn: %w", err)
537 }
538
539 if err = i.Enforcer.E.SavePolicy(); err != nil {
540 return fmt.Errorf("failed to save ACLs: %w", err)
541 }
542
543 l.Info("removed spindle member")
544 }
545
546 return nil
547}
548
549func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
550 did := e.Did
551 var err error
552
553 l := i.Logger.With("handler", "ingestSpindle")
554 l = l.With("nsid", e.Commit.Collection)
555
556 switch e.Commit.Operation {
557 case jmodels.CommitOperationCreate:
558 raw := json.RawMessage(e.Commit.Record)
559 record := tangled.Spindle{}
560 err = json.Unmarshal(raw, &record)
561 if err != nil {
562 l.Error("invalid record", "err", err)
563 return err
564 }
565
566 instance := e.Commit.RKey
567
568 ddb, ok := i.Db.Execer.(*db.DB)
569 if !ok {
570 return fmt.Errorf("failed to index profile record, invalid db cast")
571 }
572
573 err := db.AddSpindle(ddb, models.Spindle{
574 Owner: syntax.DID(did),
575 Instance: instance,
576 })
577 if err != nil {
578 l.Error("failed to add spindle to db", "err", err, "instance", instance)
579 return err
580 }
581
582 err = retry.Do(
583 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
584 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
585 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
586 )
587 if err != nil {
588 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
589 return err
590 }
591
592 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
593 if err != nil {
594 return fmt.Errorf("failed to mark verified: %w", err)
595 }
596
597 return nil
598
599 case jmodels.CommitOperationDelete:
600 instance := e.Commit.RKey
601
602 ddb, ok := i.Db.Execer.(*db.DB)
603 if !ok {
604 return fmt.Errorf("failed to index profile record, invalid db cast")
605 }
606
607 // get record from db first
608 spindles, err := db.GetSpindles(
609 ctx,
610 ddb,
611 orm.FilterEq("owner", did),
612 orm.FilterEq("instance", instance),
613 )
614 if err != nil || len(spindles) != 1 {
615 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
616 }
617 spindle := spindles[0]
618
619 tx, err := ddb.Begin()
620 if err != nil {
621 return err
622 }
623 defer func() {
624 tx.Rollback()
625 i.Enforcer.E.LoadPolicy()
626 }()
627
628 // remove spindle members first
629 err = db.RemoveSpindleMember(
630 tx,
631 orm.FilterEq("owner", did),
632 orm.FilterEq("instance", instance),
633 )
634 if err != nil {
635 return err
636 }
637
638 err = db.DeleteSpindle(
639 tx,
640 orm.FilterEq("owner", did),
641 orm.FilterEq("instance", instance),
642 )
643 if err != nil {
644 return err
645 }
646
647 if spindle.Verified != nil {
648 err = i.Enforcer.RemoveSpindle(instance)
649 if err != nil {
650 return err
651 }
652 }
653
654 err = tx.Commit()
655 if err != nil {
656 return err
657 }
658
659 err = i.Enforcer.E.SavePolicy()
660 if err != nil {
661 return err
662 }
663 }
664
665 return nil
666}
667
668func (i *Ingester) ingestString(e *jmodels.Event) error {
669 did := e.Did
670 rkey := e.Commit.RKey
671
672 var err error
673
674 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
675 l.Info("ingesting record")
676
677 ddb, ok := i.Db.Execer.(*db.DB)
678 if !ok {
679 return fmt.Errorf("failed to index string record, invalid db cast")
680 }
681
682 switch e.Commit.Operation {
683 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
684 raw := json.RawMessage(e.Commit.Record)
685 record := tangled.String{}
686 err = json.Unmarshal(raw, &record)
687 if err != nil {
688 l.Error("invalid record", "err", err)
689 return err
690 }
691
692 string := models.StringFromRecord(did, rkey, record)
693
694 if err = i.Validator.ValidateString(&string); err != nil {
695 l.Error("invalid record", "err", err)
696 return err
697 }
698
699 if err = db.AddString(ddb, string); err != nil {
700 l.Error("failed to add string", "err", err)
701 return err
702 }
703
704 return nil
705
706 case jmodels.CommitOperationDelete:
707 if err := db.DeleteString(
708 ddb,
709 orm.FilterEq("did", did),
710 orm.FilterEq("rkey", rkey),
711 ); err != nil {
712 l.Error("failed to delete", "err", err)
713 return fmt.Errorf("failed to delete string record: %w", err)
714 }
715
716 return nil
717 }
718
719 return nil
720}
721
722func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
723 did := e.Did
724 var err error
725
726 l := i.Logger.With("handler", "ingestKnotMember")
727 l = l.With("nsid", e.Commit.Collection)
728
729 switch e.Commit.Operation {
730 case jmodels.CommitOperationCreate:
731 raw := json.RawMessage(e.Commit.Record)
732 record := tangled.KnotMember{}
733 err = json.Unmarshal(raw, &record)
734 if err != nil {
735 l.Error("invalid record", "err", err)
736 return err
737 }
738
739 // only knot owner can invite to knots
740 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
741 if err != nil || !ok {
742 return fmt.Errorf("failed to enforce permissions: %w", err)
743 }
744
745 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
746 if err != nil {
747 return err
748 }
749
750 if memberId.Handle.IsInvalidHandle() {
751 return err
752 }
753
754 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
755 if err != nil {
756 return fmt.Errorf("failed to update ACLs: %w", err)
757 }
758
759 l.Info("added knot member")
760 case jmodels.CommitOperationDelete:
761 // we don't store knot members in a table (like we do for spindle)
762 // and we can't remove this just yet. possibly fixed if we switch
763 // to either:
764 // 1. a knot_members table like with spindle and store the rkey
765 // 2. use the knot host as the rkey
766 //
767 // TODO: implement member deletion
768 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
769 }
770
771 return nil
772}
773
774func (i *Ingester) ingestKnot(e *jmodels.Event) error {
775 did := e.Did
776 var err error
777
778 l := i.Logger.With("handler", "ingestKnot")
779 l = l.With("nsid", e.Commit.Collection)
780
781 switch e.Commit.Operation {
782 case jmodels.CommitOperationCreate:
783 raw := json.RawMessage(e.Commit.Record)
784 record := tangled.Knot{}
785 err = json.Unmarshal(raw, &record)
786 if err != nil {
787 l.Error("invalid record", "err", err)
788 return err
789 }
790
791 domain := e.Commit.RKey
792
793 ddb, ok := i.Db.Execer.(*db.DB)
794 if !ok {
795 return fmt.Errorf("failed to index profile record, invalid db cast")
796 }
797
798 err := db.AddKnot(ddb, domain, did)
799 if err != nil {
800 l.Error("failed to add knot to db", "err", err, "domain", domain)
801 return err
802 }
803
804 err = retry.Do(
805 func() error {
806 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
807 },
808 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
809 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
810 )
811 if err != nil {
812 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
813 return err
814 }
815
816 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
817 if err != nil {
818 return fmt.Errorf("failed to mark verified: %w", err)
819 }
820
821 return nil
822
823 case jmodels.CommitOperationDelete:
824 domain := e.Commit.RKey
825
826 ddb, ok := i.Db.Execer.(*db.DB)
827 if !ok {
828 return fmt.Errorf("failed to index knot record, invalid db cast")
829 }
830
831 // get record from db first
832 registrations, err := db.GetRegistrations(
833 ddb,
834 orm.FilterEq("domain", domain),
835 orm.FilterEq("did", did),
836 )
837 if err != nil {
838 return fmt.Errorf("failed to get registration: %w", err)
839 }
840 if len(registrations) != 1 {
841 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
842 }
843 registration := registrations[0]
844
845 tx, err := ddb.Begin()
846 if err != nil {
847 return err
848 }
849 defer func() {
850 tx.Rollback()
851 i.Enforcer.E.LoadPolicy()
852 }()
853
854 err = db.DeleteKnot(
855 tx,
856 orm.FilterEq("did", did),
857 orm.FilterEq("domain", domain),
858 )
859 if err != nil {
860 return err
861 }
862
863 if registration.Registered != nil {
864 err = i.Enforcer.RemoveKnot(domain)
865 if err != nil {
866 return err
867 }
868 }
869
870 err = tx.Commit()
871 if err != nil {
872 return err
873 }
874
875 err = i.Enforcer.E.SavePolicy()
876 if err != nil {
877 return err
878 }
879 }
880
881 return nil
882}
883func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
884 did := e.Did
885 rkey := e.Commit.RKey
886
887 var err error
888
889 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
890 l.Info("ingesting record")
891
892 ddb, ok := i.Db.Execer.(*db.DB)
893 if !ok {
894 return fmt.Errorf("failed to index issue record, invalid db cast")
895 }
896
897 switch e.Commit.Operation {
898 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
899 raw := json.RawMessage(e.Commit.Record)
900 record := tangled.RepoIssue{}
901 err = json.Unmarshal(raw, &record)
902 if err != nil {
903 l.Error("invalid record", "err", err)
904 return err
905 }
906
907 issue := models.IssueFromRecord(did, rkey, record)
908
909 if issue.RepoAt == "" {
910 return fmt.Errorf("issue record has no repo field")
911 }
912
913 if err := i.Validator.ValidateIssue(&issue); err != nil {
914 return fmt.Errorf("failed to validate issue: %w", err)
915 }
916
917 if record.Repo != nil {
918 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
919 if repoErr == nil && repo.RepoDid != "" {
920 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
921 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
922 }
923 }
924 }
925
926 tx, err := ddb.BeginTx(ctx, nil)
927 if err != nil {
928 l.Error("failed to begin transaction", "err", err)
929 return err
930 }
931 defer tx.Rollback()
932
933 err = db.PutIssue(tx, &issue)
934 if err != nil {
935 l.Error("failed to create issue", "err", err)
936 return err
937 }
938
939 err = tx.Commit()
940 if err != nil {
941 l.Error("failed to commit txn", "err", err)
942 return err
943 }
944
945 return nil
946
947 case jmodels.CommitOperationDelete:
948 tx, err := ddb.BeginTx(ctx, nil)
949 if err != nil {
950 l.Error("failed to begin transaction", "err", err)
951 return err
952 }
953 defer tx.Rollback()
954
955 if err := db.DeleteIssues(
956 tx,
957 did,
958 rkey,
959 ); err != nil {
960 l.Error("failed to delete", "err", err)
961 return fmt.Errorf("failed to delete issue record: %w", err)
962 }
963 if err := tx.Commit(); err != nil {
964 l.Error("failed to commit txn", "err", err)
965 return err
966 }
967
968 return nil
969 }
970
971 return nil
972}
973
974func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
975 did := e.Did
976 rkey := e.Commit.RKey
977
978 var err error
979
980 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
981 l.Info("ingesting record")
982
983 ddb, ok := i.Db.Execer.(*db.DB)
984 if !ok {
985 return fmt.Errorf("failed to index pull record, invalid db cast")
986 }
987
988 switch e.Commit.Operation {
989 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
990 raw := json.RawMessage(e.Commit.Record)
991 record := tangled.RepoPull{}
992 err = json.Unmarshal(raw, &record)
993 if err != nil {
994 l.Error("invalid record", "err", err)
995 return err
996 }
997
998 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
999 if err != nil {
1000 l.Error("failed to resolve did")
1001 return err
1002 }
1003
1004 // go through and fetch all blobs in parallel
1005 readers := make([]*io.ReadCloser, len(record.Rounds))
1006 var mu sync.Mutex
1007
1008 g, gctx := errgroup.WithContext(ctx)
1009
1010 for idx, b := range record.Rounds {
1011 g.Go(func() error {
1012 // for some reason, a blob is empty
1013 if b.PatchBlob == nil {
1014 return fmt.Errorf("missing patchBlob in round %d", idx)
1015 }
1016
1017 ownerPds := ownerId.PDSEndpoint()
1018 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1019 q := url.Query()
1020 q.Set("cid", b.PatchBlob.Ref.String())
1021 q.Set("did", did)
1022 url.RawQuery = q.Encode()
1023
1024 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1025 if err != nil {
1026 l.Error("failed to create request")
1027 return err
1028 }
1029 req.Header.Set("Content-Type", "application/json")
1030
1031 resp, err := http.DefaultClient.Do(req)
1032 if err != nil {
1033 l.Error("failed to make request")
1034 return err
1035 }
1036
1037 mu.Lock()
1038 readers[idx] = &resp.Body
1039 mu.Unlock()
1040
1041 return nil
1042 })
1043 }
1044
1045 if err := g.Wait(); err != nil {
1046 for _, r := range readers {
1047 if r != nil && *r != nil {
1048 (*r).Close()
1049 }
1050 }
1051 return err
1052 }
1053
1054 defer func() {
1055 for _, r := range readers {
1056 if r != nil && *r != nil {
1057 (*r).Close()
1058 }
1059 }
1060 }()
1061
1062 pull, err := models.PullFromRecord(did, rkey, record, readers)
1063 if err != nil {
1064 return fmt.Errorf("failed to parse pull from record: %w", err)
1065 }
1066 if err := i.Validator.ValidatePull(pull); err != nil {
1067 return fmt.Errorf("failed to validate pull: %w", err)
1068 }
1069
1070 tx, err := ddb.BeginTx(ctx, nil)
1071 if err != nil {
1072 l.Error("failed to begin transaction", "err", err)
1073 return err
1074 }
1075 defer tx.Rollback()
1076
1077 err = db.PutPull(tx, pull)
1078 if err != nil {
1079 l.Error("failed to create pull", "err", err)
1080 return err
1081 }
1082
1083 err = tx.Commit()
1084 if err != nil {
1085 l.Error("failed to commit txn", "err", err)
1086 return err
1087 }
1088
1089 return nil
1090
1091 case jmodels.CommitOperationDelete:
1092 tx, err := ddb.BeginTx(ctx, nil)
1093 if err != nil {
1094 l.Error("failed to begin transaction", "err", err)
1095 return err
1096 }
1097 defer tx.Rollback()
1098
1099 if err := db.AbandonPulls(
1100 tx,
1101 orm.FilterEq("owner_did", did),
1102 orm.FilterEq("rkey", rkey),
1103 ); err != nil {
1104 l.Error("failed to abandon", "err", err)
1105 return fmt.Errorf("failed to abandon pull record: %w", err)
1106 }
1107 if err := tx.Commit(); err != nil {
1108 l.Error("failed to commit txn", "err", err)
1109 return err
1110 }
1111
1112 return nil
1113 }
1114
1115 return nil
1116}
1117
1118// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1119func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1120 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey)
1121 l.Info("ingesting record")
1122
1123 switch e.Commit.Operation {
1124 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1125 // no-op. sh.tangled.repo.issue.comment is deprecated
1126
1127 case jmodels.CommitOperationDelete:
1128 if err := db.PurgeComments(
1129 i.Db,
1130 orm.FilterEq("did", e.Did),
1131 orm.FilterEq("collection", e.Commit.Collection),
1132 orm.FilterEq("rkey", e.Commit.RKey),
1133 ); err != nil {
1134 return fmt.Errorf("failed to delete comment record: %w", err)
1135 }
1136 }
1137
1138 return nil
1139}
1140
1141// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1142func (i *Ingester) ingestPullComment(e *jmodels.Event) error {
1143 l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey)
1144 l.Info("ingesting record")
1145
1146 switch e.Commit.Operation {
1147 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1148 // no-op. sh.tangled.repo.pull.comment is deprecated
1149
1150 case jmodels.CommitOperationDelete:
1151 if err := db.PurgeComments(
1152 i.Db,
1153 orm.FilterEq("did", e.Did),
1154 orm.FilterEq("collection", e.Commit.Collection),
1155 orm.FilterEq("rkey", e.Commit.RKey),
1156 ); err != nil {
1157 return fmt.Errorf("failed to delete comment record: %w", err)
1158 }
1159 }
1160
1161 return nil
1162}
1163
1164func (i *Ingester) ingestComment(e *jmodels.Event) error {
1165 did := e.Did
1166 rkey := e.Commit.RKey
1167 cid := e.Commit.CID
1168
1169 var err error
1170
1171 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1172 l.Info("ingesting record")
1173
1174 ddb, ok := i.Db.Execer.(*db.DB)
1175 if !ok {
1176 return fmt.Errorf("failed to index issue comment record, invalid db cast")
1177 }
1178
1179 ctx := context.Background()
1180
1181 switch e.Commit.Operation {
1182 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1183 raw := json.RawMessage(e.Commit.Record)
1184 record := tangled.FeedComment{}
1185 err = json.Unmarshal(raw, &record)
1186 if err != nil {
1187 return fmt.Errorf("invalid record: %w", err)
1188 }
1189
1190 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1191 if err != nil {
1192 return fmt.Errorf("failed to parse comment from record: %w", err)
1193 }
1194
1195 if err := comment.Validate(); err != nil {
1196 return fmt.Errorf("failed to validate comment: %w", err)
1197 }
1198
1199 var mentions []syntax.DID
1200 var references []syntax.ATURI
1201 if comment.Body.Original != nil {
1202 mentions, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1203 }
1204
1205 tx, err := ddb.Begin()
1206 if err != nil {
1207 return fmt.Errorf("failed to start transaction: %w", err)
1208 }
1209 defer tx.Rollback()
1210
1211 err = db.PutComment(tx, comment, references)
1212 if err != nil {
1213 return fmt.Errorf("failed to create comment: %w", err)
1214 }
1215
1216 if err := tx.Commit(); err != nil {
1217 return err
1218 }
1219
1220 if e.Commit.Operation == jmodels.CommitOperationCreate {
1221 i.Notifier.NewComment(ctx, comment, mentions)
1222 }
1223
1224 case jmodels.CommitOperationDelete:
1225 if err := db.DeleteComments(
1226 ddb,
1227 orm.FilterEq("did", did),
1228 orm.FilterEq("collection", e.Commit.Collection),
1229 orm.FilterEq("rkey", rkey),
1230 ); err != nil {
1231 return fmt.Errorf("failed to delete comment record: %w", err)
1232 }
1233
1234 return nil
1235 }
1236
1237 return nil
1238}
1239
1240func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1241 did := e.Did
1242 rkey := e.Commit.RKey
1243
1244 var err error
1245
1246 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1247 l.Info("ingesting record")
1248
1249 ddb, ok := i.Db.Execer.(*db.DB)
1250 if !ok {
1251 return fmt.Errorf("failed to index label definition, invalid db cast")
1252 }
1253
1254 switch e.Commit.Operation {
1255 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1256 raw := json.RawMessage(e.Commit.Record)
1257 record := tangled.LabelDefinition{}
1258 err = json.Unmarshal(raw, &record)
1259 if err != nil {
1260 return fmt.Errorf("invalid record: %w", err)
1261 }
1262
1263 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1264 if err != nil {
1265 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1266 }
1267
1268 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1269 return fmt.Errorf("failed to validate labeldef: %w", err)
1270 }
1271
1272 _, err = db.AddLabelDefinition(ddb, def)
1273 if err != nil {
1274 return fmt.Errorf("failed to create labeldef: %w", err)
1275 }
1276
1277 return nil
1278
1279 case jmodels.CommitOperationDelete:
1280 if err := db.DeleteLabelDefinition(
1281 ddb,
1282 orm.FilterEq("did", did),
1283 orm.FilterEq("rkey", rkey),
1284 ); err != nil {
1285 return fmt.Errorf("failed to delete labeldef record: %w", err)
1286 }
1287
1288 return nil
1289 }
1290
1291 return nil
1292}
1293
1294func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1295 did := e.Did
1296 rkey := e.Commit.RKey
1297
1298 var err error
1299
1300 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1301 l.Info("ingesting record")
1302
1303 ddb, ok := i.Db.Execer.(*db.DB)
1304 if !ok {
1305 return fmt.Errorf("failed to index label op, invalid db cast")
1306 }
1307
1308 switch e.Commit.Operation {
1309 case jmodels.CommitOperationCreate:
1310 raw := json.RawMessage(e.Commit.Record)
1311 record := tangled.LabelOp{}
1312 err = json.Unmarshal(raw, &record)
1313 if err != nil {
1314 return fmt.Errorf("invalid record: %w", err)
1315 }
1316
1317 subject := syntax.ATURI(record.Subject)
1318 collection := subject.Collection()
1319
1320 var repo *models.Repo
1321 switch collection {
1322 case tangled.RepoIssueNSID:
1323 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1324 if err != nil || len(i) != 1 {
1325 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1326 }
1327 repo = i[0].Repo
1328 default:
1329 return fmt.Errorf("unsupported label subject: %s", collection)
1330 }
1331
1332 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1333 if err != nil {
1334 return fmt.Errorf("failed to build label application ctx: %w", err)
1335 }
1336
1337 ops := models.LabelOpsFromRecord(did, rkey, record)
1338
1339 for _, o := range ops {
1340 def, ok := actx.Defs[o.OperandKey]
1341 if !ok {
1342 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1343 }
1344 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1345 return fmt.Errorf("failed to validate labelop: %w", err)
1346 }
1347 }
1348
1349 tx, err := ddb.Begin()
1350 if err != nil {
1351 return err
1352 }
1353 defer tx.Rollback()
1354
1355 for _, o := range ops {
1356 _, err = db.AddLabelOp(tx, &o)
1357 if err != nil {
1358 return fmt.Errorf("failed to add labelop: %w", err)
1359 }
1360 }
1361
1362 if err = tx.Commit(); err != nil {
1363 return err
1364 }
1365 }
1366
1367 return nil
1368}