https://github.com/bluesky-social/goat but with tangled's CI
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at ci 488 lines 13 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "net/http" 11 "net/url" 12 "os" 13 "strings" 14 "time" 15 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 "github.com/bluesky-social/indigo/atproto/data" 18 "github.com/bluesky-social/indigo/atproto/identity" 19 "github.com/bluesky-social/indigo/atproto/repo" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 "github.com/bluesky-social/indigo/events" 22 "github.com/bluesky-social/indigo/events/schedulers/parallel" 23 lexutil "github.com/bluesky-social/indigo/lex/util" 24 25 "github.com/gorilla/websocket" 26 "github.com/urfave/cli/v3" 27) 28 29var cmdFirehose = &cli.Command{ 30 Name: "firehose", 31 Usage: "stream repo and identity events", 32 Flags: []cli.Flag{ 33 &cli.StringFlag{ 34 Name: "relay-host", 35 Usage: "method, hostname, and port of Relay instance (websocket)", 36 Value: "wss://bsky.network", 37 Sources: cli.EnvVars("ATP_RELAY_HOST", "RELAY_HOST"), 38 }, 39 &cli.IntFlag{ 40 Name: "cursor", 41 Usage: "cursor to consume at", 42 }, 43 &cli.StringSliceFlag{ 44 Name: "collection", 45 Aliases: []string{"c"}, 46 Usage: "filter to specific record types (NSID)", 47 }, 48 &cli.BoolFlag{ 49 Name: "account-events", 50 Usage: "only print account and identity events", 51 }, 52 &cli.BoolFlag{ 53 Name: "blocks", 54 Usage: "include blocks as base64 in payload", 55 }, 56 &cli.BoolFlag{ 57 Name: "quiet", 58 Aliases: []string{"q"}, 59 Usage: "don't actually print events to stdout (eg, errors only)", 60 }, 61 &cli.BoolFlag{ 62 Name: "verify-basic", 63 Usage: "parse events and do basic syntax and structure checks", 64 }, 65 &cli.BoolFlag{ 66 Name: "verify-sig", 67 Usage: "verify account signatures on commits", 68 }, 69 &cli.BoolFlag{ 70 Name: "verify-mst", 71 Usage: "run inductive verification of ops and MST structure", 72 }, 73 &cli.BoolFlag{ 74 Name: "ops", 75 Aliases: []string{"records"}, 76 Usage: "instead of printing entire events, print individual record ops", 77 }, 78 }, 79 Action: runFirehose, 80} 81 82type GoatFirehoseConsumer struct { 83 OpsMode bool 84 AccountsOnly bool 85 Quiet bool 86 Blocks bool 87 VerifyBasic bool 88 VerifySig bool 89 VerifyMST bool 90 // filter to specified collections 91 CollectionFilter []string 92 // for signature verification 93 Dir identity.Directory 94} 95 96func runFirehose(ctx context.Context, cmd *cli.Command) error { 97 98 slog.SetDefault(configLogger(cmd, os.Stderr)) 99 100 // main thing is skipping handle verification 101 bdir := identity.BaseDirectory{ 102 SkipHandleVerification: true, 103 TryAuthoritativeDNS: false, 104 SkipDNSDomainSuffixes: []string{".bsky.social"}, 105 UserAgent: *userAgent(), 106 } 107 cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5) 108 109 gfc := GoatFirehoseConsumer{ 110 OpsMode: cmd.Bool("ops"), 111 AccountsOnly: cmd.Bool("account-events"), 112 CollectionFilter: cmd.StringSlice("collection"), 113 Quiet: cmd.Bool("quiet"), 114 Blocks: cmd.Bool("blocks"), 115 VerifyBasic: cmd.Bool("verify-basic"), 116 VerifySig: cmd.Bool("verify-sig"), 117 VerifyMST: cmd.Bool("verify-mst"), 118 Dir: &cdir, 119 } 120 121 var relayHost string 122 if cmd.IsSet("relay-host") { 123 if cmd.Args().Len() != 0 { 124 return errors.New("error: unused positional args") 125 } 126 relayHost = cmd.String("relay-host") 127 } else { 128 if cmd.Args().Len() == 1 { 129 relayHost = cmd.Args().First() 130 } else if cmd.Args().Len() > 1 { 131 return errors.New("can only have at most one relay-host") 132 } else { 133 relayHost = cmd.String("relay-host") 134 } 135 } 136 137 dialer := websocket.DefaultDialer 138 u, err := url.Parse(relayHost) 139 if err != nil { 140 return fmt.Errorf("invalid relayHost URI: %w", err) 141 } 142 switch u.Scheme { 143 case "http": 144 u.Scheme = "ws" 145 case "https": 146 u.Scheme = "wss" 147 } 148 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 149 if cmd.IsSet("cursor") { 150 u.RawQuery = fmt.Sprintf("cursor=%d", cmd.Int("cursor")) 151 } 152 urlString := u.String() 153 con, _, err := dialer.Dial(urlString, http.Header{ 154 "User-Agent": []string{*userAgent()}, 155 }) 156 if err != nil { 157 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 158 } 159 160 rsc := &events.RepoStreamCallbacks{ 161 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 162 //slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq) 163 if !gfc.AccountsOnly && !gfc.OpsMode { 164 return gfc.handleCommitEvent(ctx, evt) 165 } else if !gfc.AccountsOnly && gfc.OpsMode { 166 return gfc.handleCommitEventOps(ctx, evt) 167 } 168 return nil 169 }, 170 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 171 //slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq) 172 if !gfc.AccountsOnly && !gfc.OpsMode { 173 return gfc.handleSyncEvent(ctx, evt) 174 } 175 return nil 176 }, 177 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 178 //slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq) 179 if !gfc.OpsMode { 180 return gfc.handleIdentityEvent(ctx, evt) 181 } 182 return nil 183 }, 184 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 185 //slog.Debug("account event", "did", evt.Did, "seq", evt.Seq) 186 if !gfc.OpsMode { 187 return gfc.handleAccountEvent(ctx, evt) 188 } 189 return nil 190 }, 191 } 192 193 scheduler := parallel.NewScheduler( 194 1, 195 100, 196 relayHost, 197 rsc.EventHandler, 198 ) 199 slog.Info("starting firehose consumer", "relayHost", relayHost) 200 return events.HandleRepoStream(ctx, con, scheduler, nil) 201} 202 203func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error { 204 if gfc.VerifySig { 205 did, err := syntax.ParseDID(evt.Did) 206 if err != nil { 207 return err 208 } 209 gfc.Dir.Purge(ctx, did.AtIdentifier()) 210 } 211 if gfc.VerifyBasic { 212 if _, err := syntax.ParseDID(evt.Did); err != nil { 213 slog.Warn("invalid DID", "eventType", "identity", "did", evt.Did, "seq", evt.Seq) 214 } 215 } 216 if gfc.Quiet { 217 return nil 218 } 219 out := make(map[string]interface{}) 220 out["type"] = "identity" 221 out["payload"] = evt 222 b, err := json.Marshal(out) 223 if err != nil { 224 return err 225 } 226 fmt.Println(string(b)) 227 return nil 228} 229 230func (gfc *GoatFirehoseConsumer) handleAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error { 231 if gfc.VerifyBasic { 232 if _, err := syntax.ParseDID(evt.Did); err != nil { 233 slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq) 234 } 235 } 236 if gfc.Quiet { 237 return nil 238 } 239 out := make(map[string]interface{}) 240 out["type"] = "account" 241 out["payload"] = evt 242 b, err := json.Marshal(out) 243 if err != nil { 244 return err 245 } 246 fmt.Println(string(b)) 247 return nil 248} 249 250func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error { 251 commit, _, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks)) 252 if err != nil { 253 return err 254 } 255 if gfc.VerifyBasic { 256 if err := commit.VerifyStructure(); err != nil { 257 slog.Warn("bad commit object", "eventType", "sync", "did", evt.Did, "seq", evt.Seq, "err", err) 258 } 259 if _, err := syntax.ParseDID(evt.Did); err != nil { 260 slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq) 261 } 262 } 263 if gfc.Quiet { 264 return nil 265 } 266 if !gfc.Blocks { 267 evt.Blocks = nil 268 } 269 out := make(map[string]interface{}) 270 out["type"] = "sync" 271 out["commit"] = commit.AsData() // NOTE: funky, but helpful, to include this in output 272 out["payload"] = evt 273 b, err := json.Marshal(out) 274 if err != nil { 275 return err 276 } 277 fmt.Println(string(b)) 278 return nil 279} 280 281// this is the simple version, when not in "records" mode: print the event as JSON, but don't include blocks 282func (gfc *GoatFirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 283 284 if gfc.VerifyBasic || gfc.VerifySig || gfc.VerifyMST { 285 286 logger := slog.With("eventType", "commit", "did", evt.Repo, "seq", evt.Seq, "rev", evt.Rev) 287 288 did, err := syntax.ParseDID(evt.Repo) 289 if err != nil { 290 return err 291 } 292 293 commit, _, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks)) 294 if err != nil { 295 return err 296 } 297 298 if gfc.VerifySig { 299 ident, err := gfc.Dir.LookupDID(ctx, did) 300 if err != nil { 301 return err 302 } 303 pubkey, err := ident.PublicKey() 304 if err != nil { 305 return err 306 } 307 logger = logger.With("pds", ident.PDSEndpoint()) 308 if err := commit.VerifySignature(pubkey); err != nil { 309 logger.Warn("commit signature validation failed", "err", err) 310 } 311 } 312 313 if len(evt.Blocks) == 0 { 314 logger.Warn("commit message missing blocks") 315 } 316 317 if gfc.VerifyBasic { 318 // the commit itself 319 if err := commit.VerifyStructure(); err != nil { 320 logger.Warn("bad commit object", "err", err) 321 } 322 // the event fields 323 rev, err := syntax.ParseTID(evt.Rev) 324 if err != nil { 325 logger.Warn("bad TID syntax in commit rev", "err", err) 326 } 327 if rev.String() != commit.Rev { 328 logger.Warn("event rev != commit rev", "commitRev", commit.Rev) 329 } 330 if did.String() != commit.DID { 331 logger.Warn("event DID != commit DID", "commitDID", commit.DID) 332 } 333 _, err = syntax.ParseDatetime(evt.Time) 334 if err != nil { 335 logger.Warn("bad datetime syntax in commit time", "time", evt.Time, "err", err) 336 } 337 if evt.TooBig { 338 logger.Warn("deprecated tooBig commit flag set") 339 } 340 if evt.Rebase { 341 logger.Warn("deprecated rebase commit flag set") 342 } 343 } 344 345 if gfc.VerifyMST { 346 if evt.PrevData == nil { 347 logger.Warn("prevData is nil, skipping MST check") 348 } else { 349 // TODO: break out this function in to smaller chunks 350 if _, err := repo.VerifyCommitMessage(ctx, evt); err != nil { 351 logger.Warn("failed to invert commit MST", "err", err) 352 } 353 } 354 } 355 } 356 357 if gfc.Quiet { 358 return nil 359 } 360 361 // apply collections filter 362 if len(gfc.CollectionFilter) > 0 { 363 keep := false 364 for _, op := range evt.Ops { 365 parts := strings.SplitN(op.Path, "/", 3) 366 if len(parts) != 2 { 367 slog.Error("invalid record path", "path", op.Path) 368 return nil 369 } 370 collection := parts[0] 371 for _, c := range gfc.CollectionFilter { 372 if c == collection { 373 keep = true 374 break 375 } 376 } 377 if keep { 378 break 379 } 380 } 381 if !keep { 382 return nil 383 } 384 } 385 386 if !gfc.Blocks { 387 evt.Blocks = nil 388 } 389 out := make(map[string]interface{}) 390 out["type"] = "commit" 391 out["payload"] = evt 392 b, err := json.Marshal(out) 393 if err != nil { 394 return err 395 } 396 fmt.Println(string(b)) 397 return nil 398} 399 400func (gfc *GoatFirehoseConsumer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 401 logger := slog.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 402 403 if evt.TooBig { 404 logger.Warn("skipping tooBig events for now") 405 return nil 406 } 407 408 _, rr, err := repo.LoadRepoFromCAR(ctx, bytes.NewReader(evt.Blocks)) 409 if err != nil { 410 logger.Error("failed to read repo from car", "err", err) 411 return nil 412 } 413 414 for _, op := range evt.Ops { 415 collection, rkey, err := syntax.ParseRepoPath(op.Path) 416 if err != nil { 417 logger.Error("invalid path in repo op", "eventKind", op.Action, "path", op.Path) 418 return nil 419 } 420 logger = logger.With("eventKind", op.Action, "collection", collection, "rkey", rkey) 421 422 if len(gfc.CollectionFilter) > 0 { 423 keep := false 424 for _, c := range gfc.CollectionFilter { 425 if collection.String() == c { 426 keep = true 427 break 428 } 429 } 430 if !keep { 431 continue 432 } 433 } 434 435 out := make(map[string]interface{}) 436 out["seq"] = evt.Seq 437 out["rev"] = evt.Rev 438 out["time"] = evt.Time 439 out["collection"] = collection 440 out["rkey"] = rkey 441 442 switch op.Action { 443 case "create", "update": 444 coll, rkey, err := syntax.ParseRepoPath(op.Path) 445 if err != nil { 446 return err 447 } 448 // read the record bytes from blocks, and verify CID 449 recBytes, rc, err := rr.GetRecordBytes(ctx, coll, rkey) 450 if err != nil { 451 logger.Error("reading record from event blocks (CAR)", "err", err) 452 break 453 } 454 if op.Cid == nil || lexutil.LexLink(*rc) != *op.Cid { 455 logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 456 break 457 } 458 459 out["action"] = op.Action 460 d, err := data.UnmarshalCBOR(recBytes) 461 if err != nil { 462 slog.Warn("failed to parse record CBOR") 463 continue 464 } 465 out["cid"] = op.Cid.String() 466 out["record"] = d 467 b, err := json.Marshal(out) 468 if err != nil { 469 return err 470 } 471 if !gfc.Quiet { 472 fmt.Println(string(b)) 473 } 474 case "delete": 475 out["action"] = "delete" 476 b, err := json.Marshal(out) 477 if err != nil { 478 return err 479 } 480 if !gfc.Quiet { 481 fmt.Println(string(b)) 482 } 483 default: 484 logger.Error("unexpected record op kind") 485 } 486 } 487 return nil 488}