forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/bluesky-social/jetstream/pkg/models"
11 tangled "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/appview/db"
13)
14
15type Ingester func(ctx context.Context, e *models.Event) error
16
17func jetstreamIngester(d db.DbWrapper) Ingester {
18 return func(ctx context.Context, e *models.Event) error {
19 var err error
20 defer func() {
21 eventTime := e.TimeUS
22 lastTimeUs := eventTime + 1
23 if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
24 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
25 }
26 }()
27
28 if e.Kind != models.EventKindCommit {
29 return nil
30 }
31
32 did := e.Did
33 raw := json.RawMessage(e.Commit.Record)
34
35 switch e.Commit.Collection {
36 case tangled.GraphFollowNSID:
37 record := tangled.GraphFollow{}
38 err := json.Unmarshal(raw, &record)
39 if err != nil {
40 log.Println("invalid record")
41 return err
42 }
43 err = db.AddFollow(d, did, record.Subject, e.Commit.RKey)
44 if err != nil {
45 return fmt.Errorf("failed to add follow to db: %w", err)
46 }
47 case tangled.FeedStarNSID:
48 record := tangled.FeedStar{}
49 err := json.Unmarshal(raw, &record)
50 if err != nil {
51 log.Println("invalid record")
52 return err
53 }
54
55 subjectUri, err := syntax.ParseATURI(record.Subject)
56
57 if err != nil {
58 log.Println("invalid record")
59 return err
60 }
61
62 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
63 if err != nil {
64 return fmt.Errorf("failed to add follow to db: %w", err)
65 }
66 }
67
68 return err
69 }
70}