forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/config"
16 "tangled.sh/tangled.sh/core/appview/db"
17 "tangled.sh/tangled.sh/core/appview/idresolver"
18 "tangled.sh/tangled.sh/core/appview/spindleverify"
19 "tangled.sh/tangled.sh/core/rbac"
20)
21
22type Ingester struct {
23 Db db.DbWrapper
24 Enforcer *rbac.Enforcer
25 IdResolver *idresolver.Resolver
26 Config *config.Config
27 Logger *slog.Logger
28}
29
30type processFunc func(ctx context.Context, e *models.Event) error
31
32func (i *Ingester) Ingest() processFunc {
33 return func(ctx context.Context, e *models.Event) error {
34 var err error
35 defer func() {
36 eventTime := e.TimeUS
37 lastTimeUs := eventTime + 1
38 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
40 }
41 }()
42
43 if e.Kind != models.EventKindCommit {
44 return nil
45 }
46
47 switch e.Commit.Collection {
48 case tangled.GraphFollowNSID:
49 err = i.ingestFollow(e)
50 case tangled.FeedStarNSID:
51 err = i.ingestStar(e)
52 case tangled.PublicKeyNSID:
53 err = i.ingestPublicKey(e)
54 case tangled.RepoArtifactNSID:
55 err = i.ingestArtifact(e)
56 case tangled.ActorProfileNSID:
57 err = i.ingestProfile(e)
58 case tangled.SpindleMemberNSID:
59 err = i.ingestSpindleMember(e)
60 case tangled.SpindleNSID:
61 err = i.ingestSpindle(e)
62 }
63
64 if err != nil {
65 l := i.Logger.With("nsid", e.Commit.Collection)
66 l.Error("error ingesting record", "err", err)
67 }
68
69 return err
70 }
71}
72
73func (i *Ingester) ingestStar(e *models.Event) error {
74 var err error
75 did := e.Did
76
77 l := i.Logger.With("handler", "ingestStar")
78 l = l.With("nsid", e.Commit.Collection)
79
80 switch e.Commit.Operation {
81 case models.CommitOperationCreate, models.CommitOperationUpdate:
82 var subjectUri syntax.ATURI
83
84 raw := json.RawMessage(e.Commit.Record)
85 record := tangled.FeedStar{}
86 err := json.Unmarshal(raw, &record)
87 if err != nil {
88 l.Error("invalid record", "err", err)
89 return err
90 }
91
92 subjectUri, err = syntax.ParseATURI(record.Subject)
93 if err != nil {
94 l.Error("invalid record", "err", err)
95 return err
96 }
97 err = db.AddStar(i.Db, did, subjectUri, e.Commit.RKey)
98 case models.CommitOperationDelete:
99 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
100 }
101
102 if err != nil {
103 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
104 }
105
106 return nil
107}
108
109func (i *Ingester) ingestFollow(e *models.Event) error {
110 var err error
111 did := e.Did
112
113 l := i.Logger.With("handler", "ingestFollow")
114 l = l.With("nsid", e.Commit.Collection)
115
116 switch e.Commit.Operation {
117 case models.CommitOperationCreate, models.CommitOperationUpdate:
118 raw := json.RawMessage(e.Commit.Record)
119 record := tangled.GraphFollow{}
120 err = json.Unmarshal(raw, &record)
121 if err != nil {
122 l.Error("invalid record", "err", err)
123 return err
124 }
125
126 subjectDid := record.Subject
127 err = db.AddFollow(i.Db, did, subjectDid, e.Commit.RKey)
128 case models.CommitOperationDelete:
129 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
130 }
131
132 if err != nil {
133 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
134 }
135
136 return nil
137}
138
139func (i *Ingester) ingestPublicKey(e *models.Event) error {
140 did := e.Did
141 var err error
142
143 l := i.Logger.With("handler", "ingestPublicKey")
144 l = l.With("nsid", e.Commit.Collection)
145
146 switch e.Commit.Operation {
147 case models.CommitOperationCreate, models.CommitOperationUpdate:
148 l.Debug("processing add of pubkey")
149 raw := json.RawMessage(e.Commit.Record)
150 record := tangled.PublicKey{}
151 err = json.Unmarshal(raw, &record)
152 if err != nil {
153 l.Error("invalid record", "err", err)
154 return err
155 }
156
157 name := record.Name
158 key := record.Key
159 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
160 case models.CommitOperationDelete:
161 l.Debug("processing delete of pubkey")
162 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
163 }
164
165 if err != nil {
166 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
167 }
168
169 return nil
170}
171
172func (i *Ingester) ingestArtifact(e *models.Event) error {
173 did := e.Did
174 var err error
175
176 l := i.Logger.With("handler", "ingestArtifact")
177 l = l.With("nsid", e.Commit.Collection)
178
179 switch e.Commit.Operation {
180 case models.CommitOperationCreate, models.CommitOperationUpdate:
181 raw := json.RawMessage(e.Commit.Record)
182 record := tangled.RepoArtifact{}
183 err = json.Unmarshal(raw, &record)
184 if err != nil {
185 l.Error("invalid record", "err", err)
186 return err
187 }
188
189 repoAt, err := syntax.ParseATURI(record.Repo)
190 if err != nil {
191 return err
192 }
193
194 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
195 if err != nil {
196 return err
197 }
198
199 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
200 if err != nil || !ok {
201 return err
202 }
203
204 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
205 if err != nil {
206 createdAt = time.Now()
207 }
208
209 artifact := db.Artifact{
210 Did: did,
211 Rkey: e.Commit.RKey,
212 RepoAt: repoAt,
213 Tag: plumbing.Hash(record.Tag),
214 CreatedAt: createdAt,
215 BlobCid: cid.Cid(record.Artifact.Ref),
216 Name: record.Name,
217 Size: uint64(record.Artifact.Size),
218 MimeType: record.Artifact.MimeType,
219 }
220
221 err = db.AddArtifact(i.Db, artifact)
222 case models.CommitOperationDelete:
223 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
224 }
225
226 if err != nil {
227 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
228 }
229
230 return nil
231}
232
233func (i *Ingester) ingestProfile(e *models.Event) error {
234 did := e.Did
235 var err error
236
237 l := i.Logger.With("handler", "ingestProfile")
238 l = l.With("nsid", e.Commit.Collection)
239
240 if e.Commit.RKey != "self" {
241 return fmt.Errorf("ingestProfile only ingests `self` record")
242 }
243
244 switch e.Commit.Operation {
245 case models.CommitOperationCreate, models.CommitOperationUpdate:
246 raw := json.RawMessage(e.Commit.Record)
247 record := tangled.ActorProfile{}
248 err = json.Unmarshal(raw, &record)
249 if err != nil {
250 l.Error("invalid record", "err", err)
251 return err
252 }
253
254 description := ""
255 if record.Description != nil {
256 description = *record.Description
257 }
258
259 includeBluesky := record.Bluesky
260
261 location := ""
262 if record.Location != nil {
263 location = *record.Location
264 }
265
266 var links [5]string
267 for i, l := range record.Links {
268 if i < 5 {
269 links[i] = l
270 }
271 }
272
273 var stats [2]db.VanityStat
274 for i, s := range record.Stats {
275 if i < 2 {
276 stats[i].Kind = db.VanityStatKind(s)
277 }
278 }
279
280 var pinned [6]syntax.ATURI
281 for i, r := range record.PinnedRepositories {
282 if i < 6 {
283 pinned[i] = syntax.ATURI(r)
284 }
285 }
286
287 profile := db.Profile{
288 Did: did,
289 Description: description,
290 IncludeBluesky: includeBluesky,
291 Location: location,
292 Links: links,
293 Stats: stats,
294 PinnedRepos: pinned,
295 }
296
297 ddb, ok := i.Db.Execer.(*db.DB)
298 if !ok {
299 return fmt.Errorf("failed to index profile record, invalid db cast")
300 }
301
302 tx, err := ddb.Begin()
303 if err != nil {
304 return fmt.Errorf("failed to start transaction")
305 }
306
307 err = db.ValidateProfile(tx, &profile)
308 if err != nil {
309 return fmt.Errorf("invalid profile record")
310 }
311
312 err = db.UpsertProfile(tx, &profile)
313 case models.CommitOperationDelete:
314 err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
315 }
316
317 if err != nil {
318 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
319 }
320
321 return nil
322}
323
324func (i *Ingester) ingestSpindleMember(e *models.Event) error {
325 did := e.Did
326 var err error
327
328 l := i.Logger.With("handler", "ingestSpindleMember")
329 l = l.With("nsid", e.Commit.Collection)
330
331 switch e.Commit.Operation {
332 case models.CommitOperationCreate:
333 raw := json.RawMessage(e.Commit.Record)
334 record := tangled.SpindleMember{}
335 err = json.Unmarshal(raw, &record)
336 if err != nil {
337 l.Error("invalid record", "err", err)
338 return err
339 }
340
341 // only spindle owner can invite to spindles
342 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
343 if err != nil || !ok {
344 return fmt.Errorf("failed to enforce permissions: %w", err)
345 }
346
347 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
348 if err != nil {
349 return err
350 }
351
352 if memberId.Handle.IsInvalidHandle() {
353 return err
354 }
355
356 ddb, ok := i.Db.Execer.(*db.DB)
357 if !ok {
358 return fmt.Errorf("failed to index profile record, invalid db cast")
359 }
360
361 err = db.AddSpindleMember(ddb, db.SpindleMember{
362 Did: syntax.DID(did),
363 Rkey: e.Commit.RKey,
364 Instance: record.Instance,
365 Subject: memberId.DID,
366 })
367 if !ok {
368 return fmt.Errorf("failed to add to db: %w", err)
369 }
370
371 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
372 if err != nil {
373 return fmt.Errorf("failed to update ACLs: %w", err)
374 }
375 case models.CommitOperationDelete:
376 rkey := e.Commit.RKey
377
378 ddb, ok := i.Db.Execer.(*db.DB)
379 if !ok {
380 return fmt.Errorf("failed to index profile record, invalid db cast")
381 }
382
383 // get record from db first
384 members, err := db.GetSpindleMembers(
385 ddb,
386 db.FilterEq("did", did),
387 db.FilterEq("rkey", rkey),
388 )
389 if err != nil || len(members) != 1 {
390 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
391 }
392 member := members[0]
393
394 tx, err := ddb.Begin()
395 if err != nil {
396 return fmt.Errorf("failed to start txn: %w", err)
397 }
398
399 // remove record by rkey && update enforcer
400 if err = db.RemoveSpindleMember(
401 tx,
402 db.FilterEq("did", did),
403 db.FilterEq("rkey", rkey),
404 ); err != nil {
405 return fmt.Errorf("failed to remove from db: %w", err)
406 }
407
408 // update enforcer
409 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
410 if err != nil {
411 return fmt.Errorf("failed to update ACLs: %w", err)
412 }
413
414 if err = tx.Commit(); err != nil {
415 return fmt.Errorf("failed to commit txn: %w", err)
416 }
417
418 if err = i.Enforcer.E.SavePolicy(); err != nil {
419 return fmt.Errorf("failed to save ACLs: %w", err)
420 }
421 }
422
423 return nil
424}
425
426func (i *Ingester) ingestSpindle(e *models.Event) error {
427 did := e.Did
428 var err error
429
430 l := i.Logger.With("handler", "ingestSpindle")
431 l = l.With("nsid", e.Commit.Collection)
432
433 switch e.Commit.Operation {
434 case models.CommitOperationCreate:
435 raw := json.RawMessage(e.Commit.Record)
436 record := tangled.Spindle{}
437 err = json.Unmarshal(raw, &record)
438 if err != nil {
439 l.Error("invalid record", "err", err)
440 return err
441 }
442
443 instance := e.Commit.RKey
444
445 ddb, ok := i.Db.Execer.(*db.DB)
446 if !ok {
447 return fmt.Errorf("failed to index profile record, invalid db cast")
448 }
449
450 err := db.AddSpindle(ddb, db.Spindle{
451 Owner: syntax.DID(did),
452 Instance: instance,
453 })
454 if err != nil {
455 l.Error("failed to add spindle to db", "err", err, "instance", instance)
456 return err
457 }
458
459 err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev)
460 if err != nil {
461 l.Error("failed to add spindle to db", "err", err, "instance", instance)
462 return err
463 }
464
465 _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did)
466 if err != nil {
467 return fmt.Errorf("failed to mark verified: %w", err)
468 }
469
470 return nil
471
472 case models.CommitOperationDelete:
473 instance := e.Commit.RKey
474
475 ddb, ok := i.Db.Execer.(*db.DB)
476 if !ok {
477 return fmt.Errorf("failed to index profile record, invalid db cast")
478 }
479
480 // get record from db first
481 spindles, err := db.GetSpindles(
482 ddb,
483 db.FilterEq("owner", did),
484 db.FilterEq("instance", instance),
485 )
486 if err != nil || len(spindles) != 1 {
487 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
488 }
489
490 tx, err := ddb.Begin()
491 if err != nil {
492 return err
493 }
494 defer func() {
495 tx.Rollback()
496 i.Enforcer.E.LoadPolicy()
497 }()
498
499 err = db.DeleteSpindle(
500 tx,
501 db.FilterEq("owner", did),
502 db.FilterEq("instance", instance),
503 )
504 if err != nil {
505 return err
506 }
507
508 err = i.Enforcer.RemoveSpindle(instance)
509 if err != nil {
510 return err
511 }
512
513 err = tx.Commit()
514 if err != nil {
515 return err
516 }
517
518 err = i.Enforcer.E.SavePolicy()
519 if err != nil {
520 return err
521 }
522 }
523
524 return nil
525}