1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "net/http"
11 "net/url"
12 "os"
13 "strings"
14 "time"
15
16 comatproto "github.com/bluesky-social/indigo/api/atproto"
17 "github.com/bluesky-social/indigo/atproto/data"
18 "github.com/bluesky-social/indigo/atproto/identity"
19 "github.com/bluesky-social/indigo/atproto/repo"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 "github.com/bluesky-social/indigo/events"
22 "github.com/bluesky-social/indigo/events/schedulers/parallel"
23 lexutil "github.com/bluesky-social/indigo/lex/util"
24
25 "github.com/gorilla/websocket"
26 "github.com/urfave/cli/v2"
27)
28
29var cmdFirehose = &cli.Command{
30 Name: "firehose",
31 Usage: "stream repo and identity events",
32 Flags: []cli.Flag{
33 &cli.StringFlag{
34 Name: "relay-host",
35 Usage: "method, hostname, and port of Relay instance (websocket)",
36 Value: "wss://bsky.network",
37 EnvVars: []string{"ATP_RELAY_HOST", "RELAY_HOST"},
38 },
39 &cli.IntFlag{
40 Name: "cursor",
41 Usage: "cursor to consume at",
42 },
43 &cli.StringSliceFlag{
44 Name: "collection",
45 Aliases: []string{"c"},
46 Usage: "filter to specific record types (NSID)",
47 },
48 &cli.BoolFlag{
49 Name: "account-events",
50 Usage: "only print account and identity events",
51 },
52 &cli.BoolFlag{
53 Name: "blocks",
54 Usage: "include blocks as base64 in payload",
55 },
56 &cli.BoolFlag{
57 Name: "quiet",
58 Aliases: []string{"q"},
59 Usage: "don't actually print events to stdout (eg, errors only)",
60 },
61 &cli.BoolFlag{
62 Name: "verify-basic",
63 Usage: "parse events and do basic syntax and structure checks",
64 },
65 &cli.BoolFlag{
66 Name: "verify-sig",
67 Usage: "verify account signatures on commits",
68 },
69 &cli.BoolFlag{
70 Name: "verify-mst",
71 Usage: "run inductive verification of ops and MST structure",
72 },
73 &cli.BoolFlag{
74 Name: "ops",
75 Aliases: []string{"records"},
76 Usage: "instead of printing entire events, print individual record ops",
77 },
78 },
79 Action: runFirehose,
80}
81
82type GoatFirehoseConsumer struct {
83 OpsMode bool
84 AccountsOnly bool
85 Quiet bool
86 Blocks bool
87 VerifyBasic bool
88 VerifySig bool
89 VerifyMST bool
90 // filter to specified collections
91 CollectionFilter []string
92 // for signature verification
93 Dir identity.Directory
94}
95
96func runFirehose(cctx *cli.Context) error {
97 ctx := context.Background()
98
99 slog.SetDefault(configLogger(cctx, os.Stderr))
100
101 // main thing is skipping handle verification
102 bdir := identity.BaseDirectory{
103 SkipHandleVerification: true,
104 TryAuthoritativeDNS: false,
105 SkipDNSDomainSuffixes: []string{".bsky.social"},
106 UserAgent: *userAgent(),
107 }
108 cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5)
109
110 gfc := GoatFirehoseConsumer{
111 OpsMode: cctx.Bool("ops"),
112 AccountsOnly: cctx.Bool("account-events"),
113 CollectionFilter: cctx.StringSlice("collection"),
114 Quiet: cctx.Bool("quiet"),
115 Blocks: cctx.Bool("blocks"),
116 VerifyBasic: cctx.Bool("verify-basic"),
117 VerifySig: cctx.Bool("verify-sig"),
118 VerifyMST: cctx.Bool("verify-mst"),
119 Dir: &cdir,
120 }
121
122 var relayHost string
123 if cctx.IsSet("relay-host") {
124 if cctx.Args().Len() != 0 {
125 return errors.New("error: unused positional args")
126 }
127 relayHost = cctx.String("relay-host")
128 } else {
129 if cctx.Args().Len() == 1 {
130 relayHost = cctx.Args().First()
131 } else if cctx.Args().Len() > 1 {
132 return errors.New("can only have at most one relay-host")
133 } else {
134 relayHost = cctx.String("relay-host")
135 }
136 }
137
138 dialer := websocket.DefaultDialer
139 u, err := url.Parse(relayHost)
140 if err != nil {
141 return fmt.Errorf("invalid relayHost URI: %w", err)
142 }
143 switch u.Scheme {
144 case "http":
145 u.Scheme = "ws"
146 case "https":
147 u.Scheme = "wss"
148 }
149 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
150 if cctx.IsSet("cursor") {
151 u.RawQuery = fmt.Sprintf("cursor=%d", cctx.Int("cursor"))
152 }
153 urlString := u.String()
154 con, _, err := dialer.Dial(urlString, http.Header{
155 "User-Agent": []string{*userAgent()},
156 })
157 if err != nil {
158 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
159 }
160
161 rsc := &events.RepoStreamCallbacks{
162 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
163 //slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq)
164 if !gfc.AccountsOnly && !gfc.OpsMode {
165 return gfc.handleCommitEvent(ctx, evt)
166 } else if !gfc.AccountsOnly && gfc.OpsMode {
167 return gfc.handleCommitEventOps(ctx, evt)
168 }
169 return nil
170 },
171 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error {
172 //slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq)
173 if !gfc.AccountsOnly && !gfc.OpsMode {
174 return gfc.handleSyncEvent(ctx, evt)
175 }
176 return nil
177 },
178 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
179 //slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq)
180 if !gfc.OpsMode {
181 return gfc.handleIdentityEvent(ctx, evt)
182 }
183 return nil
184 },
185 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error {
186 //slog.Debug("account event", "did", evt.Did, "seq", evt.Seq)
187 if !gfc.OpsMode {
188 return gfc.handleAccountEvent(ctx, evt)
189 }
190 return nil
191 },
192 }
193
194 scheduler := parallel.NewScheduler(
195 1,
196 100,
197 relayHost,
198 rsc.EventHandler,
199 )
200 slog.Info("starting firehose consumer", "relayHost", relayHost)
201 return events.HandleRepoStream(ctx, con, scheduler, nil)
202}
203
204func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error {
205 if gfc.VerifySig {
206 did, err := syntax.ParseDID(evt.Did)
207 if err != nil {
208 return err
209 }
210 gfc.Dir.Purge(ctx, did.AtIdentifier())
211 }
212 if gfc.VerifyBasic {
213 if _, err := syntax.ParseDID(evt.Did); err != nil {
214 slog.Warn("invalid DID", "eventType", "identity", "did", evt.Did, "seq", evt.Seq)
215 }
216 }
217 if gfc.Quiet {
218 return nil
219 }
220 out := make(map[string]interface{})
221 out["type"] = "identity"
222 out["payload"] = evt
223 b, err := json.Marshal(out)
224 if err != nil {
225 return err
226 }
227 fmt.Println(string(b))
228 return nil
229}
230
231func (gfc *GoatFirehoseConsumer) handleAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error {
232 if gfc.VerifyBasic {
233 if _, err := syntax.ParseDID(evt.Did); err != nil {
234 slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq)
235 }
236 }
237 if gfc.Quiet {
238 return nil
239 }
240 out := make(map[string]interface{})
241 out["type"] = "account"
242 out["payload"] = evt
243 b, err := json.Marshal(out)
244 if err != nil {
245 return err
246 }
247 fmt.Println(string(b))
248 return nil
249}
250
251func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error {
252 commit, _, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks))
253 if err != nil {
254 return err
255 }
256 if gfc.VerifyBasic {
257 if err := commit.VerifyStructure(); err != nil {
258 slog.Warn("bad commit object", "eventType", "sync", "did", evt.Did, "seq", evt.Seq, "err", err)
259 }
260 if _, err := syntax.ParseDID(evt.Did); err != nil {
261 slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq)
262 }
263 }
264 if gfc.Quiet {
265 return nil
266 }
267 if !gfc.Blocks {
268 evt.Blocks = nil
269 }
270 out := make(map[string]interface{})
271 out["type"] = "sync"
272 out["commit"] = commit.AsData() // NOTE: funky, but helpful, to include this in output
273 out["payload"] = evt
274 b, err := json.Marshal(out)
275 if err != nil {
276 return err
277 }
278 fmt.Println(string(b))
279 return nil
280}
281
282// this is the simple version, when not in "records" mode: print the event as JSON, but don't include blocks
283func (gfc *GoatFirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
284
285 if gfc.VerifyBasic || gfc.VerifySig || gfc.VerifyMST {
286
287 logger := slog.With("eventType", "commit", "did", evt.Repo, "seq", evt.Seq, "rev", evt.Rev)
288
289 did, err := syntax.ParseDID(evt.Repo)
290 if err != nil {
291 return err
292 }
293
294 commit, _, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks))
295 if err != nil {
296 return err
297 }
298
299 if gfc.VerifySig {
300 ident, err := gfc.Dir.LookupDID(ctx, did)
301 if err != nil {
302 return err
303 }
304 pubkey, err := ident.PublicKey()
305 if err != nil {
306 return err
307 }
308 logger = logger.With("pds", ident.PDSEndpoint())
309 if err := commit.VerifySignature(pubkey); err != nil {
310 logger.Warn("commit signature validation failed", "err", err)
311 }
312 }
313
314 if len(evt.Blocks) == 0 {
315 logger.Warn("commit message missing blocks")
316 }
317
318 if gfc.VerifyBasic {
319 // the commit itself
320 if err := commit.VerifyStructure(); err != nil {
321 logger.Warn("bad commit object", "err", err)
322 }
323 // the event fields
324 rev, err := syntax.ParseTID(evt.Rev)
325 if err != nil {
326 logger.Warn("bad TID syntax in commit rev", "err", err)
327 }
328 if rev.String() != commit.Rev {
329 logger.Warn("event rev != commit rev", "commitRev", commit.Rev)
330 }
331 if did.String() != commit.DID {
332 logger.Warn("event DID != commit DID", "commitDID", commit.DID)
333 }
334 _, err = syntax.ParseDatetime(evt.Time)
335 if err != nil {
336 logger.Warn("bad datetime syntax in commit time", "time", evt.Time, "err", err)
337 }
338 if evt.TooBig {
339 logger.Warn("deprecated tooBig commit flag set")
340 }
341 if evt.Rebase {
342 logger.Warn("deprecated rebase commit flag set")
343 }
344 }
345
346 if gfc.VerifyMST {
347 if evt.PrevData == nil {
348 logger.Warn("prevData is nil, skipping MST check")
349 } else {
350 // TODO: break out this function in to smaller chunks
351 if _, err := repo.VerifyCommitMessage(ctx, evt); err != nil {
352 logger.Warn("failed to invert commit MST", "err", err)
353 }
354 }
355 }
356 }
357
358 if gfc.Quiet {
359 return nil
360 }
361
362 // apply collections filter
363 if len(gfc.CollectionFilter) > 0 {
364 keep := false
365 for _, op := range evt.Ops {
366 parts := strings.SplitN(op.Path, "/", 3)
367 if len(parts) != 2 {
368 slog.Error("invalid record path", "path", op.Path)
369 return nil
370 }
371 collection := parts[0]
372 for _, c := range gfc.CollectionFilter {
373 if c == collection {
374 keep = true
375 break
376 }
377 }
378 if keep == true {
379 break
380 }
381 }
382 if !keep {
383 return nil
384 }
385 }
386
387 if !gfc.Blocks {
388 evt.Blocks = nil
389 }
390 out := make(map[string]interface{})
391 out["type"] = "commit"
392 out["payload"] = evt
393 b, err := json.Marshal(out)
394 if err != nil {
395 return err
396 }
397 fmt.Println(string(b))
398 return nil
399}
400
401func (gfc *GoatFirehoseConsumer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
402 logger := slog.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
403
404 if evt.TooBig {
405 logger.Warn("skipping tooBig events for now")
406 return nil
407 }
408
409 _, rr, err := repo.LoadRepoFromCAR(ctx, bytes.NewReader(evt.Blocks))
410 if err != nil {
411 logger.Error("failed to read repo from car", "err", err)
412 return nil
413 }
414
415 for _, op := range evt.Ops {
416 collection, rkey, err := syntax.ParseRepoPath(op.Path)
417 if err != nil {
418 logger.Error("invalid path in repo op", "eventKind", op.Action, "path", op.Path)
419 return nil
420 }
421 logger = logger.With("eventKind", op.Action, "collection", collection, "rkey", rkey)
422
423 if len(gfc.CollectionFilter) > 0 {
424 keep := false
425 for _, c := range gfc.CollectionFilter {
426 if collection.String() == c {
427 keep = true
428 break
429 }
430 }
431 if keep == false {
432 continue
433 }
434 }
435
436 out := make(map[string]interface{})
437 out["seq"] = evt.Seq
438 out["rev"] = evt.Rev
439 out["time"] = evt.Time
440 out["collection"] = collection
441 out["rkey"] = rkey
442
443 switch op.Action {
444 case "create", "update":
445 coll, rkey, err := syntax.ParseRepoPath(op.Path)
446 if err != nil {
447 return err
448 }
449 // read the record bytes from blocks, and verify CID
450 recBytes, rc, err := rr.GetRecordBytes(ctx, coll, rkey)
451 if err != nil {
452 logger.Error("reading record from event blocks (CAR)", "err", err)
453 break
454 }
455 if op.Cid == nil || lexutil.LexLink(*rc) != *op.Cid {
456 logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid)
457 break
458 }
459
460 out["action"] = op.Action
461 d, err := data.UnmarshalCBOR(recBytes)
462 if err != nil {
463 slog.Warn("failed to parse record CBOR")
464 continue
465 }
466 out["cid"] = op.Cid.String()
467 out["record"] = d
468 b, err := json.Marshal(out)
469 if err != nil {
470 return err
471 }
472 if !gfc.Quiet {
473 fmt.Println(string(b))
474 }
475 case "delete":
476 out["action"] = "delete"
477 b, err := json.Marshal(out)
478 if err != nil {
479 return err
480 }
481 if !gfc.Quiet {
482 fmt.Println(string(b))
483 }
484 default:
485 logger.Error("unexpected record op kind")
486 }
487 }
488 return nil
489}