forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/spindleverify"
18 "tangled.sh/tangled.sh/core/idresolver"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 l := i.Logger.With("kind", e.Kind)
44 switch e.Kind {
45 case models.EventKindAccount:
46 if !e.Account.Active && *e.Account.Status == "deactivated" {
47 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
48 }
49 case models.EventKindIdentity:
50 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
51 case models.EventKindCommit:
52 switch e.Commit.Collection {
53 case tangled.GraphFollowNSID:
54 err = i.ingestFollow(e)
55 case tangled.FeedStarNSID:
56 err = i.ingestStar(e)
57 case tangled.PublicKeyNSID:
58 err = i.ingestPublicKey(e)
59 case tangled.RepoArtifactNSID:
60 err = i.ingestArtifact(e)
61 case tangled.ActorProfileNSID:
62 err = i.ingestProfile(e)
63 case tangled.SpindleMemberNSID:
64 err = i.ingestSpindleMember(e)
65 case tangled.SpindleNSID:
66 err = i.ingestSpindle(e)
67 }
68 l = i.Logger.With("nsid", e.Commit.Collection)
69 }
70
71 if err != nil {
72 l.Error("error ingesting record", "err", err)
73 }
74
75 return err
76 }
77}
78
79func (i *Ingester) ingestStar(e *models.Event) error {
80 var err error
81 did := e.Did
82
83 l := i.Logger.With("handler", "ingestStar")
84 l = l.With("nsid", e.Commit.Collection)
85
86 switch e.Commit.Operation {
87 case models.CommitOperationCreate, models.CommitOperationUpdate:
88 var subjectUri syntax.ATURI
89
90 raw := json.RawMessage(e.Commit.Record)
91 record := tangled.FeedStar{}
92 err := json.Unmarshal(raw, &record)
93 if err != nil {
94 l.Error("invalid record", "err", err)
95 return err
96 }
97
98 subjectUri, err = syntax.ParseATURI(record.Subject)
99 if err != nil {
100 l.Error("invalid record", "err", err)
101 return err
102 }
103 err = db.AddStar(i.Db, &db.Star{
104 StarredByDid: did,
105 RepoAt: subjectUri,
106 Rkey: e.Commit.RKey,
107 })
108 case models.CommitOperationDelete:
109 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
110 }
111
112 if err != nil {
113 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
114 }
115
116 return nil
117}
118
119func (i *Ingester) ingestFollow(e *models.Event) error {
120 var err error
121 did := e.Did
122
123 l := i.Logger.With("handler", "ingestFollow")
124 l = l.With("nsid", e.Commit.Collection)
125
126 switch e.Commit.Operation {
127 case models.CommitOperationCreate, models.CommitOperationUpdate:
128 raw := json.RawMessage(e.Commit.Record)
129 record := tangled.GraphFollow{}
130 err = json.Unmarshal(raw, &record)
131 if err != nil {
132 l.Error("invalid record", "err", err)
133 return err
134 }
135
136 err = db.AddFollow(i.Db, &db.Follow{
137 UserDid: did,
138 SubjectDid: record.Subject,
139 Rkey: e.Commit.RKey,
140 })
141 case models.CommitOperationDelete:
142 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
143 }
144
145 if err != nil {
146 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
147 }
148
149 return nil
150}
151
152func (i *Ingester) ingestPublicKey(e *models.Event) error {
153 did := e.Did
154 var err error
155
156 l := i.Logger.With("handler", "ingestPublicKey")
157 l = l.With("nsid", e.Commit.Collection)
158
159 switch e.Commit.Operation {
160 case models.CommitOperationCreate, models.CommitOperationUpdate:
161 l.Debug("processing add of pubkey")
162 raw := json.RawMessage(e.Commit.Record)
163 record := tangled.PublicKey{}
164 err = json.Unmarshal(raw, &record)
165 if err != nil {
166 l.Error("invalid record", "err", err)
167 return err
168 }
169
170 name := record.Name
171 key := record.Key
172 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
173 case models.CommitOperationDelete:
174 l.Debug("processing delete of pubkey")
175 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
176 }
177
178 if err != nil {
179 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
180 }
181
182 return nil
183}
184
185func (i *Ingester) ingestArtifact(e *models.Event) error {
186 did := e.Did
187 var err error
188
189 l := i.Logger.With("handler", "ingestArtifact")
190 l = l.With("nsid", e.Commit.Collection)
191
192 switch e.Commit.Operation {
193 case models.CommitOperationCreate, models.CommitOperationUpdate:
194 raw := json.RawMessage(e.Commit.Record)
195 record := tangled.RepoArtifact{}
196 err = json.Unmarshal(raw, &record)
197 if err != nil {
198 l.Error("invalid record", "err", err)
199 return err
200 }
201
202 repoAt, err := syntax.ParseATURI(record.Repo)
203 if err != nil {
204 return err
205 }
206
207 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
208 if err != nil {
209 return err
210 }
211
212 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
213 if err != nil || !ok {
214 return err
215 }
216
217 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
218 if err != nil {
219 createdAt = time.Now()
220 }
221
222 artifact := db.Artifact{
223 Did: did,
224 Rkey: e.Commit.RKey,
225 RepoAt: repoAt,
226 Tag: plumbing.Hash(record.Tag),
227 CreatedAt: createdAt,
228 BlobCid: cid.Cid(record.Artifact.Ref),
229 Name: record.Name,
230 Size: uint64(record.Artifact.Size),
231 MimeType: record.Artifact.MimeType,
232 }
233
234 err = db.AddArtifact(i.Db, artifact)
235 case models.CommitOperationDelete:
236 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
237 }
238
239 if err != nil {
240 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
241 }
242
243 return nil
244}
245
246func (i *Ingester) ingestProfile(e *models.Event) error {
247 did := e.Did
248 var err error
249
250 l := i.Logger.With("handler", "ingestProfile")
251 l = l.With("nsid", e.Commit.Collection)
252
253 if e.Commit.RKey != "self" {
254 return fmt.Errorf("ingestProfile only ingests `self` record")
255 }
256
257 switch e.Commit.Operation {
258 case models.CommitOperationCreate, models.CommitOperationUpdate:
259 raw := json.RawMessage(e.Commit.Record)
260 record := tangled.ActorProfile{}
261 err = json.Unmarshal(raw, &record)
262 if err != nil {
263 l.Error("invalid record", "err", err)
264 return err
265 }
266
267 description := ""
268 if record.Description != nil {
269 description = *record.Description
270 }
271
272 includeBluesky := record.Bluesky
273
274 location := ""
275 if record.Location != nil {
276 location = *record.Location
277 }
278
279 var links [5]string
280 for i, l := range record.Links {
281 if i < 5 {
282 links[i] = l
283 }
284 }
285
286 var stats [2]db.VanityStat
287 for i, s := range record.Stats {
288 if i < 2 {
289 stats[i].Kind = db.VanityStatKind(s)
290 }
291 }
292
293 var pinned [6]syntax.ATURI
294 for i, r := range record.PinnedRepositories {
295 if i < 6 {
296 pinned[i] = syntax.ATURI(r)
297 }
298 }
299
300 profile := db.Profile{
301 Did: did,
302 Description: description,
303 IncludeBluesky: includeBluesky,
304 Location: location,
305 Links: links,
306 Stats: stats,
307 PinnedRepos: pinned,
308 }
309
310 ddb, ok := i.Db.Execer.(*db.DB)
311 if !ok {
312 return fmt.Errorf("failed to index profile record, invalid db cast")
313 }
314
315 tx, err := ddb.Begin()
316 if err != nil {
317 return fmt.Errorf("failed to start transaction")
318 }
319
320 err = db.ValidateProfile(tx, &profile)
321 if err != nil {
322 return fmt.Errorf("invalid profile record")
323 }
324
325 err = db.UpsertProfile(tx, &profile)
326 case models.CommitOperationDelete:
327 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
328 }
329
330 if err != nil {
331 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
332 }
333
334 return nil
335}
336
337func (i *Ingester) ingestSpindleMember(e *models.Event) error {
338 did := e.Did
339 var err error
340
341 l := i.Logger.With("handler", "ingestSpindleMember")
342 l = l.With("nsid", e.Commit.Collection)
343
344 switch e.Commit.Operation {
345 case models.CommitOperationCreate:
346 raw := json.RawMessage(e.Commit.Record)
347 record := tangled.SpindleMember{}
348 err = json.Unmarshal(raw, &record)
349 if err != nil {
350 l.Error("invalid record", "err", err)
351 return err
352 }
353
354 // only spindle owner can invite to spindles
355 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
356 if err != nil || !ok {
357 return fmt.Errorf("failed to enforce permissions: %w", err)
358 }
359
360 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
361 if err != nil {
362 return err
363 }
364
365 if memberId.Handle.IsInvalidHandle() {
366 return err
367 }
368
369 ddb, ok := i.Db.Execer.(*db.DB)
370 if !ok {
371 return fmt.Errorf("failed to index profile record, invalid db cast")
372 }
373
374 err = db.AddSpindleMember(ddb, db.SpindleMember{
375 Did: syntax.DID(did),
376 Rkey: e.Commit.RKey,
377 Instance: record.Instance,
378 Subject: memberId.DID,
379 })
380 if !ok {
381 return fmt.Errorf("failed to add to db: %w", err)
382 }
383
384 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
385 if err != nil {
386 return fmt.Errorf("failed to update ACLs: %w", err)
387 }
388 case models.CommitOperationDelete:
389 rkey := e.Commit.RKey
390
391 ddb, ok := i.Db.Execer.(*db.DB)
392 if !ok {
393 return fmt.Errorf("failed to index profile record, invalid db cast")
394 }
395
396 // get record from db first
397 members, err := db.GetSpindleMembers(
398 ddb,
399 db.FilterEq("did", did),
400 db.FilterEq("rkey", rkey),
401 )
402 if err != nil || len(members) != 1 {
403 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
404 }
405 member := members[0]
406
407 tx, err := ddb.Begin()
408 if err != nil {
409 return fmt.Errorf("failed to start txn: %w", err)
410 }
411
412 // remove record by rkey && update enforcer
413 if err = db.RemoveSpindleMember(
414 tx,
415 db.FilterEq("did", did),
416 db.FilterEq("rkey", rkey),
417 ); err != nil {
418 return fmt.Errorf("failed to remove from db: %w", err)
419 }
420
421 // update enforcer
422 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
423 if err != nil {
424 return fmt.Errorf("failed to update ACLs: %w", err)
425 }
426
427 if err = tx.Commit(); err != nil {
428 return fmt.Errorf("failed to commit txn: %w", err)
429 }
430
431 if err = i.Enforcer.E.SavePolicy(); err != nil {
432 return fmt.Errorf("failed to save ACLs: %w", err)
433 }
434 }
435
436 return nil
437}
438
439func (i *Ingester) ingestSpindle(e *models.Event) error {
440 did := e.Did
441 var err error
442
443 l := i.Logger.With("handler", "ingestSpindle")
444 l = l.With("nsid", e.Commit.Collection)
445
446 switch e.Commit.Operation {
447 case models.CommitOperationCreate:
448 raw := json.RawMessage(e.Commit.Record)
449 record := tangled.Spindle{}
450 err = json.Unmarshal(raw, &record)
451 if err != nil {
452 l.Error("invalid record", "err", err)
453 return err
454 }
455
456 instance := e.Commit.RKey
457
458 ddb, ok := i.Db.Execer.(*db.DB)
459 if !ok {
460 return fmt.Errorf("failed to index profile record, invalid db cast")
461 }
462
463 err := db.AddSpindle(ddb, db.Spindle{
464 Owner: syntax.DID(did),
465 Instance: instance,
466 })
467 if err != nil {
468 l.Error("failed to add spindle to db", "err", err, "instance", instance)
469 return err
470 }
471
472 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
473 if err != nil {
474 l.Error("failed to add spindle to db", "err", err, "instance", instance)
475 return err
476 }
477
478 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did)
479 if err != nil {
480 return fmt.Errorf("failed to mark verified: %w", err)
481 }
482
483 return nil
484
485 case models.CommitOperationDelete:
486 instance := e.Commit.RKey
487
488 ddb, ok := i.Db.Execer.(*db.DB)
489 if !ok {
490 return fmt.Errorf("failed to index profile record, invalid db cast")
491 }
492
493 // get record from db first
494 spindles, err := db.GetSpindles(
495 ddb,
496 db.FilterEq("owner", did),
497 db.FilterEq("instance", instance),
498 )
499 if err != nil || len(spindles) != 1 {
500 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
501 }
502 spindle := spindles[0]
503
504 tx, err := ddb.Begin()
505 if err != nil {
506 return err
507 }
508 defer func() {
509 tx.Rollback()
510 i.Enforcer.E.LoadPolicy()
511 }()
512
513 err = db.DeleteSpindle(
514 tx,
515 db.FilterEq("owner", did),
516 db.FilterEq("instance", instance),
517 )
518 if err != nil {
519 return err
520 }
521
522 if spindle.Verified != nil {
523 err = i.Enforcer.RemoveSpindle(instance)
524 if err != nil {
525 return err
526 }
527 }
528
529 err = tx.Commit()
530 if err != nil {
531 return err
532 }
533
534 err = i.Enforcer.E.SavePolicy()
535 if err != nil {
536 return err
537 }
538 }
539
540 return nil
541}