https://github.com/bluesky-social/goat but with tangled's CI
1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "net/http"
11 "net/url"
12 "os"
13 "strings"
14 "time"
15
16 comatproto "github.com/bluesky-social/indigo/api/atproto"
17 "github.com/bluesky-social/indigo/atproto/data"
18 "github.com/bluesky-social/indigo/atproto/identity"
19 "github.com/bluesky-social/indigo/atproto/repo"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 "github.com/bluesky-social/indigo/events"
22 "github.com/bluesky-social/indigo/events/schedulers/parallel"
23 lexutil "github.com/bluesky-social/indigo/lex/util"
24
25 "github.com/gorilla/websocket"
26 "github.com/urfave/cli/v3"
27)
28
29var cmdFirehose = &cli.Command{
30 Name: "firehose",
31 Usage: "stream repo and identity events",
32 Flags: []cli.Flag{
33 &cli.StringFlag{
34 Name: "relay-host",
35 Usage: "method, hostname, and port of Relay instance (websocket)",
36 Value: "wss://bsky.network",
37 Sources: cli.EnvVars("ATP_RELAY_HOST", "RELAY_HOST"),
38 },
39 &cli.IntFlag{
40 Name: "cursor",
41 Usage: "cursor to consume at",
42 },
43 &cli.StringSliceFlag{
44 Name: "collection",
45 Aliases: []string{"c"},
46 Usage: "filter to specific record types (NSID)",
47 },
48 &cli.BoolFlag{
49 Name: "account-events",
50 Usage: "only print account and identity events",
51 },
52 &cli.BoolFlag{
53 Name: "blocks",
54 Usage: "include blocks as base64 in payload",
55 },
56 &cli.BoolFlag{
57 Name: "quiet",
58 Aliases: []string{"q"},
59 Usage: "don't actually print events to stdout (eg, errors only)",
60 },
61 &cli.BoolFlag{
62 Name: "verify-basic",
63 Usage: "parse events and do basic syntax and structure checks",
64 },
65 &cli.BoolFlag{
66 Name: "verify-sig",
67 Usage: "verify account signatures on commits",
68 },
69 &cli.BoolFlag{
70 Name: "verify-mst",
71 Usage: "run inductive verification of ops and MST structure",
72 },
73 &cli.BoolFlag{
74 Name: "ops",
75 Aliases: []string{"records"},
76 Usage: "instead of printing entire events, print individual record ops",
77 },
78 },
79 Action: runFirehose,
80}
81
82type GoatFirehoseConsumer struct {
83 OpsMode bool
84 AccountsOnly bool
85 Quiet bool
86 Blocks bool
87 VerifyBasic bool
88 VerifySig bool
89 VerifyMST bool
90 // filter to specified collections
91 CollectionFilter []string
92 // for signature verification
93 Dir identity.Directory
94}
95
96func runFirehose(ctx context.Context, cmd *cli.Command) error {
97
98 slog.SetDefault(configLogger(cmd, os.Stderr))
99
100 // main thing is skipping handle verification
101 bdir := identity.BaseDirectory{
102 SkipHandleVerification: true,
103 TryAuthoritativeDNS: false,
104 SkipDNSDomainSuffixes: []string{".bsky.social"},
105 UserAgent: *userAgent(),
106 }
107 cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5)
108
109 gfc := GoatFirehoseConsumer{
110 OpsMode: cmd.Bool("ops"),
111 AccountsOnly: cmd.Bool("account-events"),
112 CollectionFilter: cmd.StringSlice("collection"),
113 Quiet: cmd.Bool("quiet"),
114 Blocks: cmd.Bool("blocks"),
115 VerifyBasic: cmd.Bool("verify-basic"),
116 VerifySig: cmd.Bool("verify-sig"),
117 VerifyMST: cmd.Bool("verify-mst"),
118 Dir: &cdir,
119 }
120
121 var relayHost string
122 if cmd.IsSet("relay-host") {
123 if cmd.Args().Len() != 0 {
124 return errors.New("error: unused positional args")
125 }
126 relayHost = cmd.String("relay-host")
127 } else {
128 if cmd.Args().Len() == 1 {
129 relayHost = cmd.Args().First()
130 } else if cmd.Args().Len() > 1 {
131 return errors.New("can only have at most one relay-host")
132 } else {
133 relayHost = cmd.String("relay-host")
134 }
135 }
136
137 dialer := websocket.DefaultDialer
138 u, err := url.Parse(relayHost)
139 if err != nil {
140 return fmt.Errorf("invalid relayHost URI: %w", err)
141 }
142 switch u.Scheme {
143 case "http":
144 u.Scheme = "ws"
145 case "https":
146 u.Scheme = "wss"
147 }
148 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
149 if cmd.IsSet("cursor") {
150 u.RawQuery = fmt.Sprintf("cursor=%d", cmd.Int("cursor"))
151 }
152 urlString := u.String()
153 con, _, err := dialer.Dial(urlString, http.Header{
154 "User-Agent": []string{*userAgent()},
155 })
156 if err != nil {
157 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
158 }
159
160 rsc := &events.RepoStreamCallbacks{
161 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
162 //slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq)
163 if !gfc.AccountsOnly && !gfc.OpsMode {
164 return gfc.handleCommitEvent(ctx, evt)
165 } else if !gfc.AccountsOnly && gfc.OpsMode {
166 return gfc.handleCommitEventOps(ctx, evt)
167 }
168 return nil
169 },
170 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error {
171 //slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq)
172 if !gfc.AccountsOnly && !gfc.OpsMode {
173 return gfc.handleSyncEvent(ctx, evt)
174 }
175 return nil
176 },
177 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
178 //slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq)
179 if !gfc.OpsMode {
180 return gfc.handleIdentityEvent(ctx, evt)
181 }
182 return nil
183 },
184 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error {
185 //slog.Debug("account event", "did", evt.Did, "seq", evt.Seq)
186 if !gfc.OpsMode {
187 return gfc.handleAccountEvent(ctx, evt)
188 }
189 return nil
190 },
191 }
192
193 scheduler := parallel.NewScheduler(
194 1,
195 100,
196 relayHost,
197 rsc.EventHandler,
198 )
199 slog.Info("starting firehose consumer", "relayHost", relayHost)
200 return events.HandleRepoStream(ctx, con, scheduler, nil)
201}
202
203func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error {
204 if gfc.VerifySig {
205 did, err := syntax.ParseDID(evt.Did)
206 if err != nil {
207 return err
208 }
209 gfc.Dir.Purge(ctx, did.AtIdentifier())
210 }
211 if gfc.VerifyBasic {
212 if _, err := syntax.ParseDID(evt.Did); err != nil {
213 slog.Warn("invalid DID", "eventType", "identity", "did", evt.Did, "seq", evt.Seq)
214 }
215 }
216 if gfc.Quiet {
217 return nil
218 }
219 out := make(map[string]interface{})
220 out["type"] = "identity"
221 out["payload"] = evt
222 b, err := json.Marshal(out)
223 if err != nil {
224 return err
225 }
226 fmt.Println(string(b))
227 return nil
228}
229
230func (gfc *GoatFirehoseConsumer) handleAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error {
231 if gfc.VerifyBasic {
232 if _, err := syntax.ParseDID(evt.Did); err != nil {
233 slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq)
234 }
235 }
236 if gfc.Quiet {
237 return nil
238 }
239 out := make(map[string]interface{})
240 out["type"] = "account"
241 out["payload"] = evt
242 b, err := json.Marshal(out)
243 if err != nil {
244 return err
245 }
246 fmt.Println(string(b))
247 return nil
248}
249
250func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error {
251 commit, _, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks))
252 if err != nil {
253 return err
254 }
255 if gfc.VerifyBasic {
256 if err := commit.VerifyStructure(); err != nil {
257 slog.Warn("bad commit object", "eventType", "sync", "did", evt.Did, "seq", evt.Seq, "err", err)
258 }
259 if _, err := syntax.ParseDID(evt.Did); err != nil {
260 slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq)
261 }
262 }
263 if gfc.Quiet {
264 return nil
265 }
266 if !gfc.Blocks {
267 evt.Blocks = nil
268 }
269 out := make(map[string]interface{})
270 out["type"] = "sync"
271 out["commit"] = commit.AsData() // NOTE: funky, but helpful, to include this in output
272 out["payload"] = evt
273 b, err := json.Marshal(out)
274 if err != nil {
275 return err
276 }
277 fmt.Println(string(b))
278 return nil
279}
280
281// this is the simple version, when not in "records" mode: print the event as JSON, but don't include blocks
282func (gfc *GoatFirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
283
284 if gfc.VerifyBasic || gfc.VerifySig || gfc.VerifyMST {
285
286 logger := slog.With("eventType", "commit", "did", evt.Repo, "seq", evt.Seq, "rev", evt.Rev)
287
288 did, err := syntax.ParseDID(evt.Repo)
289 if err != nil {
290 return err
291 }
292
293 commit, _, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks))
294 if err != nil {
295 return err
296 }
297
298 if gfc.VerifySig {
299 ident, err := gfc.Dir.LookupDID(ctx, did)
300 if err != nil {
301 return err
302 }
303 pubkey, err := ident.PublicKey()
304 if err != nil {
305 return err
306 }
307 logger = logger.With("pds", ident.PDSEndpoint())
308 if err := commit.VerifySignature(pubkey); err != nil {
309 logger.Warn("commit signature validation failed", "err", err)
310 }
311 }
312
313 if len(evt.Blocks) == 0 {
314 logger.Warn("commit message missing blocks")
315 }
316
317 if gfc.VerifyBasic {
318 // the commit itself
319 if err := commit.VerifyStructure(); err != nil {
320 logger.Warn("bad commit object", "err", err)
321 }
322 // the event fields
323 rev, err := syntax.ParseTID(evt.Rev)
324 if err != nil {
325 logger.Warn("bad TID syntax in commit rev", "err", err)
326 }
327 if rev.String() != commit.Rev {
328 logger.Warn("event rev != commit rev", "commitRev", commit.Rev)
329 }
330 if did.String() != commit.DID {
331 logger.Warn("event DID != commit DID", "commitDID", commit.DID)
332 }
333 _, err = syntax.ParseDatetime(evt.Time)
334 if err != nil {
335 logger.Warn("bad datetime syntax in commit time", "time", evt.Time, "err", err)
336 }
337 if evt.TooBig {
338 logger.Warn("deprecated tooBig commit flag set")
339 }
340 if evt.Rebase {
341 logger.Warn("deprecated rebase commit flag set")
342 }
343 }
344
345 if gfc.VerifyMST {
346 if evt.PrevData == nil {
347 logger.Warn("prevData is nil, skipping MST check")
348 } else {
349 // TODO: break out this function in to smaller chunks
350 if _, err := repo.VerifyCommitMessage(ctx, evt); err != nil {
351 logger.Warn("failed to invert commit MST", "err", err)
352 }
353 }
354 }
355 }
356
357 if gfc.Quiet {
358 return nil
359 }
360
361 // apply collections filter
362 if len(gfc.CollectionFilter) > 0 {
363 keep := false
364 for _, op := range evt.Ops {
365 parts := strings.SplitN(op.Path, "/", 3)
366 if len(parts) != 2 {
367 slog.Error("invalid record path", "path", op.Path)
368 return nil
369 }
370 collection := parts[0]
371 for _, c := range gfc.CollectionFilter {
372 if c == collection {
373 keep = true
374 break
375 }
376 }
377 if keep {
378 break
379 }
380 }
381 if !keep {
382 return nil
383 }
384 }
385
386 if !gfc.Blocks {
387 evt.Blocks = nil
388 }
389 out := make(map[string]interface{})
390 out["type"] = "commit"
391 out["payload"] = evt
392 b, err := json.Marshal(out)
393 if err != nil {
394 return err
395 }
396 fmt.Println(string(b))
397 return nil
398}
399
400func (gfc *GoatFirehoseConsumer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
401 logger := slog.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
402
403 if evt.TooBig {
404 logger.Warn("skipping tooBig events for now")
405 return nil
406 }
407
408 _, rr, err := repo.LoadRepoFromCAR(ctx, bytes.NewReader(evt.Blocks))
409 if err != nil {
410 logger.Error("failed to read repo from car", "err", err)
411 return nil
412 }
413
414 for _, op := range evt.Ops {
415 collection, rkey, err := syntax.ParseRepoPath(op.Path)
416 if err != nil {
417 logger.Error("invalid path in repo op", "eventKind", op.Action, "path", op.Path)
418 return nil
419 }
420 logger = logger.With("eventKind", op.Action, "collection", collection, "rkey", rkey)
421
422 if len(gfc.CollectionFilter) > 0 {
423 keep := false
424 for _, c := range gfc.CollectionFilter {
425 if collection.String() == c {
426 keep = true
427 break
428 }
429 }
430 if !keep {
431 continue
432 }
433 }
434
435 out := make(map[string]interface{})
436 out["seq"] = evt.Seq
437 out["rev"] = evt.Rev
438 out["time"] = evt.Time
439 out["collection"] = collection
440 out["rkey"] = rkey
441
442 switch op.Action {
443 case "create", "update":
444 coll, rkey, err := syntax.ParseRepoPath(op.Path)
445 if err != nil {
446 return err
447 }
448 // read the record bytes from blocks, and verify CID
449 recBytes, rc, err := rr.GetRecordBytes(ctx, coll, rkey)
450 if err != nil {
451 logger.Error("reading record from event blocks (CAR)", "err", err)
452 break
453 }
454 if op.Cid == nil || lexutil.LexLink(*rc) != *op.Cid {
455 logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid)
456 break
457 }
458
459 out["action"] = op.Action
460 d, err := data.UnmarshalCBOR(recBytes)
461 if err != nil {
462 slog.Warn("failed to parse record CBOR")
463 continue
464 }
465 out["cid"] = op.Cid.String()
466 out["record"] = d
467 b, err := json.Marshal(out)
468 if err != nil {
469 return err
470 }
471 if !gfc.Quiet {
472 fmt.Println(string(b))
473 }
474 case "delete":
475 out["action"] = "delete"
476 b, err := json.Marshal(out)
477 if err != nil {
478 return err
479 }
480 if !gfc.Quiet {
481 fmt.Println(string(b))
482 }
483 default:
484 logger.Error("unexpected record op kind")
485 }
486 }
487 return nil
488}