forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/spindleverify"
18 "tangled.sh/tangled.sh/core/idresolver"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 l := i.Logger.With("kind", e.Kind)
44 switch e.Kind {
45 case models.EventKindAccount:
46 if !e.Account.Active && *e.Account.Status == "deactivated" {
47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
48 }
49 case models.EventKindIdentity:
50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
51 case models.EventKindCommit:
52 switch e.Commit.Collection {
53 case tangled.GraphFollowNSID:
54 err = i.ingestFollow(e)
55 case tangled.FeedStarNSID:
56 err = i.ingestStar(e)
57 case tangled.PublicKeyNSID:
58 err = i.ingestPublicKey(e)
59 case tangled.RepoArtifactNSID:
60 err = i.ingestArtifact(e)
61 case tangled.ActorProfileNSID:
62 err = i.ingestProfile(e)
63 case tangled.SpindleMemberNSID:
64 err = i.ingestSpindleMember(e)
65 case tangled.SpindleNSID:
66 err = i.ingestSpindle(e)
67 case tangled.StringNSID:
68 err = i.ingestString(e)
69 }
70 l = i.Logger.With("nsid", e.Commit.Collection)
71 }
72
73 if err != nil {
74 l.Error("error ingesting record", "err", err)
75 }
76
77 return err
78 }
79}
80
81func (i *Ingester) ingestStar(e *models.Event) error {
82 var err error
83 did := e.Did
84
85 l := i.Logger.With("handler", "ingestStar")
86 l = l.With("nsid", e.Commit.Collection)
87
88 switch e.Commit.Operation {
89 case models.CommitOperationCreate, models.CommitOperationUpdate:
90 var subjectUri syntax.ATURI
91
92 raw := json.RawMessage(e.Commit.Record)
93 record := tangled.FeedStar{}
94 err := json.Unmarshal(raw, &record)
95 if err != nil {
96 l.Error("invalid record", "err", err)
97 return err
98 }
99
100 subjectUri, err = syntax.ParseATURI(record.Subject)
101 if err != nil {
102 l.Error("invalid record", "err", err)
103 return err
104 }
105 err = db.AddStar(i.Db, &db.Star{
106 StarredByDid: did,
107 RepoAt: subjectUri,
108 Rkey: e.Commit.RKey,
109 })
110 case models.CommitOperationDelete:
111 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
112 }
113
114 if err != nil {
115 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
116 }
117
118 return nil
119}
120
121func (i *Ingester) ingestFollow(e *models.Event) error {
122 var err error
123 did := e.Did
124
125 l := i.Logger.With("handler", "ingestFollow")
126 l = l.With("nsid", e.Commit.Collection)
127
128 switch e.Commit.Operation {
129 case models.CommitOperationCreate, models.CommitOperationUpdate:
130 raw := json.RawMessage(e.Commit.Record)
131 record := tangled.GraphFollow{}
132 err = json.Unmarshal(raw, &record)
133 if err != nil {
134 l.Error("invalid record", "err", err)
135 return err
136 }
137
138 err = db.AddFollow(i.Db, &db.Follow{
139 UserDid: did,
140 SubjectDid: record.Subject,
141 Rkey: e.Commit.RKey,
142 })
143 case models.CommitOperationDelete:
144 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
145 }
146
147 if err != nil {
148 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
149 }
150
151 return nil
152}
153
154func (i *Ingester) ingestPublicKey(e *models.Event) error {
155 did := e.Did
156 var err error
157
158 l := i.Logger.With("handler", "ingestPublicKey")
159 l = l.With("nsid", e.Commit.Collection)
160
161 switch e.Commit.Operation {
162 case models.CommitOperationCreate, models.CommitOperationUpdate:
163 l.Debug("processing add of pubkey")
164 raw := json.RawMessage(e.Commit.Record)
165 record := tangled.PublicKey{}
166 err = json.Unmarshal(raw, &record)
167 if err != nil {
168 l.Error("invalid record", "err", err)
169 return err
170 }
171
172 name := record.Name
173 key := record.Key
174 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
175 case models.CommitOperationDelete:
176 l.Debug("processing delete of pubkey")
177 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
178 }
179
180 if err != nil {
181 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
182 }
183
184 return nil
185}
186
187func (i *Ingester) ingestArtifact(e *models.Event) error {
188 did := e.Did
189 var err error
190
191 l := i.Logger.With("handler", "ingestArtifact")
192 l = l.With("nsid", e.Commit.Collection)
193
194 switch e.Commit.Operation {
195 case models.CommitOperationCreate, models.CommitOperationUpdate:
196 raw := json.RawMessage(e.Commit.Record)
197 record := tangled.RepoArtifact{}
198 err = json.Unmarshal(raw, &record)
199 if err != nil {
200 l.Error("invalid record", "err", err)
201 return err
202 }
203
204 repoAt, err := syntax.ParseATURI(record.Repo)
205 if err != nil {
206 return err
207 }
208
209 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
210 if err != nil {
211 return err
212 }
213
214 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
215 if err != nil || !ok {
216 return err
217 }
218
219 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
220 if err != nil {
221 createdAt = time.Now()
222 }
223
224 artifact := db.Artifact{
225 Did: did,
226 Rkey: e.Commit.RKey,
227 RepoAt: repoAt,
228 Tag: plumbing.Hash(record.Tag),
229 CreatedAt: createdAt,
230 BlobCid: cid.Cid(record.Artifact.Ref),
231 Name: record.Name,
232 Size: uint64(record.Artifact.Size),
233 MimeType: record.Artifact.MimeType,
234 }
235
236 err = db.AddArtifact(i.Db, artifact)
237 case models.CommitOperationDelete:
238 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
239 }
240
241 if err != nil {
242 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
243 }
244
245 return nil
246}
247
248func (i *Ingester) ingestProfile(e *models.Event) error {
249 did := e.Did
250 var err error
251
252 l := i.Logger.With("handler", "ingestProfile")
253 l = l.With("nsid", e.Commit.Collection)
254
255 if e.Commit.RKey != "self" {
256 return fmt.Errorf("ingestProfile only ingests `self` record")
257 }
258
259 switch e.Commit.Operation {
260 case models.CommitOperationCreate, models.CommitOperationUpdate:
261 raw := json.RawMessage(e.Commit.Record)
262 record := tangled.ActorProfile{}
263 err = json.Unmarshal(raw, &record)
264 if err != nil {
265 l.Error("invalid record", "err", err)
266 return err
267 }
268
269 description := ""
270 if record.Description != nil {
271 description = *record.Description
272 }
273
274 includeBluesky := record.Bluesky
275
276 location := ""
277 if record.Location != nil {
278 location = *record.Location
279 }
280
281 var links [5]string
282 for i, l := range record.Links {
283 if i < 5 {
284 links[i] = l
285 }
286 }
287
288 var stats [2]db.VanityStat
289 for i, s := range record.Stats {
290 if i < 2 {
291 stats[i].Kind = db.VanityStatKind(s)
292 }
293 }
294
295 var pinned [6]syntax.ATURI
296 for i, r := range record.PinnedRepositories {
297 if i < 6 {
298 pinned[i] = syntax.ATURI(r)
299 }
300 }
301
302 profile := db.Profile{
303 Did: did,
304 Description: description,
305 IncludeBluesky: includeBluesky,
306 Location: location,
307 Links: links,
308 Stats: stats,
309 PinnedRepos: pinned,
310 }
311
312 ddb, ok := i.Db.Execer.(*db.DB)
313 if !ok {
314 return fmt.Errorf("failed to index profile record, invalid db cast")
315 }
316
317 tx, err := ddb.Begin()
318 if err != nil {
319 return fmt.Errorf("failed to start transaction")
320 }
321
322 err = db.ValidateProfile(tx, &profile)
323 if err != nil {
324 return fmt.Errorf("invalid profile record")
325 }
326
327 err = db.UpsertProfile(tx, &profile)
328 case models.CommitOperationDelete:
329 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
330 }
331
332 if err != nil {
333 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
334 }
335
336 return nil
337}
338
339func (i *Ingester) ingestSpindleMember(e *models.Event) error {
340 did := e.Did
341 var err error
342
343 l := i.Logger.With("handler", "ingestSpindleMember")
344 l = l.With("nsid", e.Commit.Collection)
345
346 switch e.Commit.Operation {
347 case models.CommitOperationCreate:
348 raw := json.RawMessage(e.Commit.Record)
349 record := tangled.SpindleMember{}
350 err = json.Unmarshal(raw, &record)
351 if err != nil {
352 l.Error("invalid record", "err", err)
353 return err
354 }
355
356 // only spindle owner can invite to spindles
357 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
358 if err != nil || !ok {
359 return fmt.Errorf("failed to enforce permissions: %w", err)
360 }
361
362 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
363 if err != nil {
364 return err
365 }
366
367 if memberId.Handle.IsInvalidHandle() {
368 return err
369 }
370
371 ddb, ok := i.Db.Execer.(*db.DB)
372 if !ok {
373 return fmt.Errorf("failed to index profile record, invalid db cast")
374 }
375
376 err = db.AddSpindleMember(ddb, db.SpindleMember{
377 Did: syntax.DID(did),
378 Rkey: e.Commit.RKey,
379 Instance: record.Instance,
380 Subject: memberId.DID,
381 })
382 if !ok {
383 return fmt.Errorf("failed to add to db: %w", err)
384 }
385
386 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
387 if err != nil {
388 return fmt.Errorf("failed to update ACLs: %w", err)
389 }
390
391 l.Info("added spindle member")
392 case models.CommitOperationDelete:
393 rkey := e.Commit.RKey
394
395 ddb, ok := i.Db.Execer.(*db.DB)
396 if !ok {
397 return fmt.Errorf("failed to index profile record, invalid db cast")
398 }
399
400 // get record from db first
401 members, err := db.GetSpindleMembers(
402 ddb,
403 db.FilterEq("did", did),
404 db.FilterEq("rkey", rkey),
405 )
406 if err != nil || len(members) != 1 {
407 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
408 }
409 member := members[0]
410
411 tx, err := ddb.Begin()
412 if err != nil {
413 return fmt.Errorf("failed to start txn: %w", err)
414 }
415
416 // remove record by rkey && update enforcer
417 if err = db.RemoveSpindleMember(
418 tx,
419 db.FilterEq("did", did),
420 db.FilterEq("rkey", rkey),
421 ); err != nil {
422 return fmt.Errorf("failed to remove from db: %w", err)
423 }
424
425 // update enforcer
426 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
427 if err != nil {
428 return fmt.Errorf("failed to update ACLs: %w", err)
429 }
430
431 if err = tx.Commit(); err != nil {
432 return fmt.Errorf("failed to commit txn: %w", err)
433 }
434
435 if err = i.Enforcer.E.SavePolicy(); err != nil {
436 return fmt.Errorf("failed to save ACLs: %w", err)
437 }
438
439 l.Info("removed spindle member")
440 }
441
442 return nil
443}
444
445func (i *Ingester) ingestSpindle(e *models.Event) error {
446 did := e.Did
447 var err error
448
449 l := i.Logger.With("handler", "ingestSpindle")
450 l = l.With("nsid", e.Commit.Collection)
451
452 switch e.Commit.Operation {
453 case models.CommitOperationCreate:
454 raw := json.RawMessage(e.Commit.Record)
455 record := tangled.Spindle{}
456 err = json.Unmarshal(raw, &record)
457 if err != nil {
458 l.Error("invalid record", "err", err)
459 return err
460 }
461
462 instance := e.Commit.RKey
463
464 ddb, ok := i.Db.Execer.(*db.DB)
465 if !ok {
466 return fmt.Errorf("failed to index profile record, invalid db cast")
467 }
468
469 err := db.AddSpindle(ddb, db.Spindle{
470 Owner: syntax.DID(did),
471 Instance: instance,
472 })
473 if err != nil {
474 l.Error("failed to add spindle to db", "err", err, "instance", instance)
475 return err
476 }
477
478 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
479 if err != nil {
480 l.Error("failed to add spindle to db", "err", err, "instance", instance)
481 return err
482 }
483
484 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did)
485 if err != nil {
486 return fmt.Errorf("failed to mark verified: %w", err)
487 }
488
489 return nil
490
491 case models.CommitOperationDelete:
492 instance := e.Commit.RKey
493
494 ddb, ok := i.Db.Execer.(*db.DB)
495 if !ok {
496 return fmt.Errorf("failed to index profile record, invalid db cast")
497 }
498
499 // get record from db first
500 spindles, err := db.GetSpindles(
501 ddb,
502 db.FilterEq("owner", did),
503 db.FilterEq("instance", instance),
504 )
505 if err != nil || len(spindles) != 1 {
506 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
507 }
508 spindle := spindles[0]
509
510 tx, err := ddb.Begin()
511 if err != nil {
512 return err
513 }
514 defer func() {
515 tx.Rollback()
516 i.Enforcer.E.LoadPolicy()
517 }()
518
519 // remove spindle members first
520 err = db.RemoveSpindleMember(
521 tx,
522 db.FilterEq("owner", did),
523 db.FilterEq("instance", instance),
524 )
525 if err != nil {
526 return err
527 }
528
529 err = db.DeleteSpindle(
530 tx,
531 db.FilterEq("owner", did),
532 db.FilterEq("instance", instance),
533 )
534 if err != nil {
535 return err
536 }
537
538 if spindle.Verified != nil {
539 err = i.Enforcer.RemoveSpindle(instance)
540 if err != nil {
541 return err
542 }
543 }
544
545 err = tx.Commit()
546 if err != nil {
547 return err
548 }
549
550 err = i.Enforcer.E.SavePolicy()
551 if err != nil {
552 return err
553 }
554 }
555
556 return nil
557}
558
559func (i *Ingester) ingestString(e *models.Event) error {
560 did := e.Did
561 rkey := e.Commit.RKey
562
563 var err error
564
565 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
566 l.Info("ingesting record")
567
568 ddb, ok := i.Db.Execer.(*db.DB)
569 if !ok {
570 return fmt.Errorf("failed to index string record, invalid db cast")
571 }
572
573 switch e.Commit.Operation {
574 case models.CommitOperationCreate, models.CommitOperationUpdate:
575 raw := json.RawMessage(e.Commit.Record)
576 record := tangled.String{}
577 err = json.Unmarshal(raw, &record)
578 if err != nil {
579 l.Error("invalid record", "err", err)
580 return err
581 }
582
583 string := db.StringFromRecord(did, rkey, record)
584
585 if err = string.Validate(); err != nil {
586 l.Error("invalid record", "err", err)
587 return err
588 }
589
590 if err = db.AddString(ddb, string); err != nil {
591 l.Error("failed to add string", "err", err)
592 return err
593 }
594
595 return nil
596
597 case models.CommitOperationDelete:
598 if err := db.DeleteString(
599 ddb,
600 db.FilterEq("did", did),
601 db.FilterEq("rkey", rkey),
602 ); err != nil {
603 l.Error("failed to delete", "err", err)
604 return fmt.Errorf("failed to delete string record: %w", err)
605 }
606
607 return nil
608 }
609
610 return nil
611}