1package main
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log/slog"
8 "net/http"
9 "net/url"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 appbsky "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/events/schedulers/parallel"
15 lexutil "github.com/bluesky-social/indigo/lex/util"
16
17 "github.com/bluesky-social/indigo/events"
18 "github.com/bluesky-social/indigo/repo"
19 "github.com/bluesky-social/indigo/repomgr"
20 "github.com/carlmjohnson/versioninfo"
21 "github.com/gorilla/websocket"
22)
23
24func RunFirehoseConsumer(ctx context.Context, logger *slog.Logger, relayHost string, postCallback func(context.Context, syntax.DID, syntax.RecordKey, appbsky.FeedPost) error) error {
25
26 dialer := websocket.DefaultDialer
27 u, err := url.Parse(relayHost)
28 if err != nil {
29 return fmt.Errorf("invalid relayHost URI: %w", err)
30 }
31 // always continue at the current cursor offset (don't provide cursor query param)
32 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
33 logger.Info("subscribing to repo event stream", "upstream", relayHost)
34 con, _, err := dialer.Dial(u.String(), http.Header{
35 "User-Agent": []string{fmt.Sprintf("beemo/%s", versioninfo.Short())},
36 })
37 if err != nil {
38 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
39 }
40
41 rsc := &events.RepoStreamCallbacks{
42 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
43 return HandleRepoCommit(ctx, logger, evt, postCallback)
44 },
45 // NOTE: could add other callbacks as needed
46 }
47
48 var scheduler events.Scheduler
49 // use parallel scheduler
50 parallelism := 4
51 scheduler = parallel.NewScheduler(
52 parallelism,
53 1000,
54 relayHost,
55 rsc.EventHandler,
56 )
57 logger.Info("beemo firehose scheduler configured", "scheduler", "parallel", "workers", parallelism)
58
59 return events.HandleRepoStream(ctx, con, scheduler, logger)
60}
61
62// NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better.
63func HandleRepoCommit(ctx context.Context, logger *slog.Logger, evt *comatproto.SyncSubscribeRepos_Commit, postCallback func(context.Context, syntax.DID, syntax.RecordKey, appbsky.FeedPost) error) error {
64
65 logger = logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
66 logger.Debug("received commit event")
67
68 if evt.TooBig {
69 logger.Warn("skipping tooBig events for now")
70 return nil
71 }
72
73 did, err := syntax.ParseDID(evt.Repo)
74 if err != nil {
75 logger.Error("bad DID syntax in event", "err", err)
76 return nil
77 }
78
79 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
80 if err != nil {
81 logger.Error("failed to read repo from car", "err", err)
82 return nil
83 }
84
85 for _, op := range evt.Ops {
86 logger = logger.With("eventKind", op.Action, "path", op.Path)
87 collection, rkey, err := syntax.ParseRepoPath(op.Path)
88 if err != nil {
89 logger.Error("invalid path in repo op")
90 return nil
91 }
92
93 ek := repomgr.EventKind(op.Action)
94 switch ek {
95 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
96 // read the record bytes from blocks, and verify CID
97 rc, recordCBOR, err := rr.GetRecordBytes(ctx, op.Path)
98 if err != nil {
99 logger.Error("reading record from event blocks (CAR)", "err", err)
100 continue
101 }
102 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid {
103 logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid)
104 continue
105 }
106
107 switch collection {
108 case "app.bsky.feed.post":
109 var post appbsky.FeedPost
110 if err := post.UnmarshalCBOR(bytes.NewReader(*recordCBOR)); err != nil {
111 logger.Error("failed to parse app.bsky.feed.post record", "err", err)
112 continue
113 }
114 if err := postCallback(ctx, did, rkey, post); err != nil {
115 logger.Error("failed to process post record", "err", err)
116 continue
117 }
118 }
119
120 default:
121 // ignore other events
122 }
123 }
124
125 return nil
126}