porting all github actions from bluesky-social/indigo to tangled CI
at main 13 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "net/http" 11 "net/url" 12 "os" 13 "strings" 14 "time" 15 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 "github.com/bluesky-social/indigo/atproto/data" 18 "github.com/bluesky-social/indigo/atproto/identity" 19 "github.com/bluesky-social/indigo/atproto/repo" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 "github.com/bluesky-social/indigo/events" 22 "github.com/bluesky-social/indigo/events/schedulers/parallel" 23 lexutil "github.com/bluesky-social/indigo/lex/util" 24 25 "github.com/gorilla/websocket" 26 "github.com/urfave/cli/v2" 27) 28 29var cmdFirehose = &cli.Command{ 30 Name: "firehose", 31 Usage: "stream repo and identity events", 32 Flags: []cli.Flag{ 33 &cli.StringFlag{ 34 Name: "relay-host", 35 Usage: "method, hostname, and port of Relay instance (websocket)", 36 Value: "wss://bsky.network", 37 EnvVars: []string{"ATP_RELAY_HOST", "RELAY_HOST"}, 38 }, 39 &cli.IntFlag{ 40 Name: "cursor", 41 Usage: "cursor to consume at", 42 }, 43 &cli.StringSliceFlag{ 44 Name: "collection", 45 Aliases: []string{"c"}, 46 Usage: "filter to specific record types (NSID)", 47 }, 48 &cli.BoolFlag{ 49 Name: "account-events", 50 Usage: "only print account and identity events", 51 }, 52 &cli.BoolFlag{ 53 Name: "blocks", 54 Usage: "include blocks as base64 in payload", 55 }, 56 &cli.BoolFlag{ 57 Name: "quiet", 58 Aliases: []string{"q"}, 59 Usage: "don't actually print events to stdout (eg, errors only)", 60 }, 61 &cli.BoolFlag{ 62 Name: "verify-basic", 63 Usage: "parse events and do basic syntax and structure checks", 64 }, 65 &cli.BoolFlag{ 66 Name: "verify-sig", 67 Usage: "verify account signatures on commits", 68 }, 69 &cli.BoolFlag{ 70 Name: "verify-mst", 71 Usage: "run inductive verification of ops and MST structure", 72 }, 73 &cli.BoolFlag{ 74 Name: "ops", 75 Aliases: []string{"records"}, 76 Usage: "instead of printing entire events, print individual record ops", 77 }, 78 }, 79 Action: runFirehose, 80} 81 82type GoatFirehoseConsumer struct { 83 OpsMode bool 84 AccountsOnly bool 85 Quiet bool 86 Blocks bool 87 VerifyBasic bool 88 VerifySig bool 89 VerifyMST bool 90 // filter to specified collections 91 CollectionFilter []string 92 // for signature verification 93 Dir identity.Directory 94} 95 96func runFirehose(cctx *cli.Context) error { 97 ctx := context.Background() 98 99 slog.SetDefault(configLogger(cctx, os.Stderr)) 100 101 // main thing is skipping handle verification 102 bdir := identity.BaseDirectory{ 103 SkipHandleVerification: true, 104 TryAuthoritativeDNS: false, 105 SkipDNSDomainSuffixes: []string{".bsky.social"}, 106 UserAgent: *userAgent(), 107 } 108 cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5) 109 110 gfc := GoatFirehoseConsumer{ 111 OpsMode: cctx.Bool("ops"), 112 AccountsOnly: cctx.Bool("account-events"), 113 CollectionFilter: cctx.StringSlice("collection"), 114 Quiet: cctx.Bool("quiet"), 115 Blocks: cctx.Bool("blocks"), 116 VerifyBasic: cctx.Bool("verify-basic"), 117 VerifySig: cctx.Bool("verify-sig"), 118 VerifyMST: cctx.Bool("verify-mst"), 119 Dir: &cdir, 120 } 121 122 var relayHost string 123 if cctx.IsSet("relay-host") { 124 if cctx.Args().Len() != 0 { 125 return errors.New("error: unused positional args") 126 } 127 relayHost = cctx.String("relay-host") 128 } else { 129 if cctx.Args().Len() == 1 { 130 relayHost = cctx.Args().First() 131 } else if cctx.Args().Len() > 1 { 132 return errors.New("can only have at most one relay-host") 133 } else { 134 relayHost = cctx.String("relay-host") 135 } 136 } 137 138 dialer := websocket.DefaultDialer 139 u, err := url.Parse(relayHost) 140 if err != nil { 141 return fmt.Errorf("invalid relayHost URI: %w", err) 142 } 143 switch u.Scheme { 144 case "http": 145 u.Scheme = "ws" 146 case "https": 147 u.Scheme = "wss" 148 } 149 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 150 if cctx.IsSet("cursor") { 151 u.RawQuery = fmt.Sprintf("cursor=%d", cctx.Int("cursor")) 152 } 153 urlString := u.String() 154 con, _, err := dialer.Dial(urlString, http.Header{ 155 "User-Agent": []string{*userAgent()}, 156 }) 157 if err != nil { 158 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 159 } 160 161 rsc := &events.RepoStreamCallbacks{ 162 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 163 //slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq) 164 if !gfc.AccountsOnly && !gfc.OpsMode { 165 return gfc.handleCommitEvent(ctx, evt) 166 } else if !gfc.AccountsOnly && gfc.OpsMode { 167 return gfc.handleCommitEventOps(ctx, evt) 168 } 169 return nil 170 }, 171 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 172 //slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq) 173 if !gfc.AccountsOnly && !gfc.OpsMode { 174 return gfc.handleSyncEvent(ctx, evt) 175 } 176 return nil 177 }, 178 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 179 //slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq) 180 if !gfc.OpsMode { 181 return gfc.handleIdentityEvent(ctx, evt) 182 } 183 return nil 184 }, 185 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 186 //slog.Debug("account event", "did", evt.Did, "seq", evt.Seq) 187 if !gfc.OpsMode { 188 return gfc.handleAccountEvent(ctx, evt) 189 } 190 return nil 191 }, 192 } 193 194 scheduler := parallel.NewScheduler( 195 1, 196 100, 197 relayHost, 198 rsc.EventHandler, 199 ) 200 slog.Info("starting firehose consumer", "relayHost", relayHost) 201 return events.HandleRepoStream(ctx, con, scheduler, nil) 202} 203 204func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error { 205 if gfc.VerifySig { 206 did, err := syntax.ParseDID(evt.Did) 207 if err != nil { 208 return err 209 } 210 gfc.Dir.Purge(ctx, did.AtIdentifier()) 211 } 212 if gfc.VerifyBasic { 213 if _, err := syntax.ParseDID(evt.Did); err != nil { 214 slog.Warn("invalid DID", "eventType", "identity", "did", evt.Did, "seq", evt.Seq) 215 } 216 } 217 if gfc.Quiet { 218 return nil 219 } 220 out := make(map[string]interface{}) 221 out["type"] = "identity" 222 out["payload"] = evt 223 b, err := json.Marshal(out) 224 if err != nil { 225 return err 226 } 227 fmt.Println(string(b)) 228 return nil 229} 230 231func (gfc *GoatFirehoseConsumer) handleAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error { 232 if gfc.VerifyBasic { 233 if _, err := syntax.ParseDID(evt.Did); err != nil { 234 slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq) 235 } 236 } 237 if gfc.Quiet { 238 return nil 239 } 240 out := make(map[string]interface{}) 241 out["type"] = "account" 242 out["payload"] = evt 243 b, err := json.Marshal(out) 244 if err != nil { 245 return err 246 } 247 fmt.Println(string(b)) 248 return nil 249} 250 251func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error { 252 commit, _, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks)) 253 if err != nil { 254 return err 255 } 256 if gfc.VerifyBasic { 257 if err := commit.VerifyStructure(); err != nil { 258 slog.Warn("bad commit object", "eventType", "sync", "did", evt.Did, "seq", evt.Seq, "err", err) 259 } 260 if _, err := syntax.ParseDID(evt.Did); err != nil { 261 slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq) 262 } 263 } 264 if gfc.Quiet { 265 return nil 266 } 267 if !gfc.Blocks { 268 evt.Blocks = nil 269 } 270 out := make(map[string]interface{}) 271 out["type"] = "sync" 272 out["commit"] = commit.AsData() // NOTE: funky, but helpful, to include this in output 273 out["payload"] = evt 274 b, err := json.Marshal(out) 275 if err != nil { 276 return err 277 } 278 fmt.Println(string(b)) 279 return nil 280} 281 282// this is the simple version, when not in "records" mode: print the event as JSON, but don't include blocks 283func (gfc *GoatFirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 284 285 if gfc.VerifyBasic || gfc.VerifySig || gfc.VerifyMST { 286 287 logger := slog.With("eventType", "commit", "did", evt.Repo, "seq", evt.Seq, "rev", evt.Rev) 288 289 did, err := syntax.ParseDID(evt.Repo) 290 if err != nil { 291 return err 292 } 293 294 commit, _, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks)) 295 if err != nil { 296 return err 297 } 298 299 if gfc.VerifySig { 300 ident, err := gfc.Dir.LookupDID(ctx, did) 301 if err != nil { 302 return err 303 } 304 pubkey, err := ident.PublicKey() 305 if err != nil { 306 return err 307 } 308 logger = logger.With("pds", ident.PDSEndpoint()) 309 if err := commit.VerifySignature(pubkey); err != nil { 310 logger.Warn("commit signature validation failed", "err", err) 311 } 312 } 313 314 if len(evt.Blocks) == 0 { 315 logger.Warn("commit message missing blocks") 316 } 317 318 if gfc.VerifyBasic { 319 // the commit itself 320 if err := commit.VerifyStructure(); err != nil { 321 logger.Warn("bad commit object", "err", err) 322 } 323 // the event fields 324 rev, err := syntax.ParseTID(evt.Rev) 325 if err != nil { 326 logger.Warn("bad TID syntax in commit rev", "err", err) 327 } 328 if rev.String() != commit.Rev { 329 logger.Warn("event rev != commit rev", "commitRev", commit.Rev) 330 } 331 if did.String() != commit.DID { 332 logger.Warn("event DID != commit DID", "commitDID", commit.DID) 333 } 334 _, err = syntax.ParseDatetime(evt.Time) 335 if err != nil { 336 logger.Warn("bad datetime syntax in commit time", "time", evt.Time, "err", err) 337 } 338 if evt.TooBig { 339 logger.Warn("deprecated tooBig commit flag set") 340 } 341 if evt.Rebase { 342 logger.Warn("deprecated rebase commit flag set") 343 } 344 } 345 346 if gfc.VerifyMST { 347 if evt.PrevData == nil { 348 logger.Warn("prevData is nil, skipping MST check") 349 } else { 350 // TODO: break out this function in to smaller chunks 351 if _, err := repo.VerifyCommitMessage(ctx, evt); err != nil { 352 logger.Warn("failed to invert commit MST", "err", err) 353 } 354 } 355 } 356 } 357 358 if gfc.Quiet { 359 return nil 360 } 361 362 // apply collections filter 363 if len(gfc.CollectionFilter) > 0 { 364 keep := false 365 for _, op := range evt.Ops { 366 parts := strings.SplitN(op.Path, "/", 3) 367 if len(parts) != 2 { 368 slog.Error("invalid record path", "path", op.Path) 369 return nil 370 } 371 collection := parts[0] 372 for _, c := range gfc.CollectionFilter { 373 if c == collection { 374 keep = true 375 break 376 } 377 } 378 if keep == true { 379 break 380 } 381 } 382 if !keep { 383 return nil 384 } 385 } 386 387 if !gfc.Blocks { 388 evt.Blocks = nil 389 } 390 out := make(map[string]interface{}) 391 out["type"] = "commit" 392 out["payload"] = evt 393 b, err := json.Marshal(out) 394 if err != nil { 395 return err 396 } 397 fmt.Println(string(b)) 398 return nil 399} 400 401func (gfc *GoatFirehoseConsumer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 402 logger := slog.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 403 404 if evt.TooBig { 405 logger.Warn("skipping tooBig events for now") 406 return nil 407 } 408 409 _, rr, err := repo.LoadRepoFromCAR(ctx, bytes.NewReader(evt.Blocks)) 410 if err != nil { 411 logger.Error("failed to read repo from car", "err", err) 412 return nil 413 } 414 415 for _, op := range evt.Ops { 416 collection, rkey, err := syntax.ParseRepoPath(op.Path) 417 if err != nil { 418 logger.Error("invalid path in repo op", "eventKind", op.Action, "path", op.Path) 419 return nil 420 } 421 logger = logger.With("eventKind", op.Action, "collection", collection, "rkey", rkey) 422 423 if len(gfc.CollectionFilter) > 0 { 424 keep := false 425 for _, c := range gfc.CollectionFilter { 426 if collection.String() == c { 427 keep = true 428 break 429 } 430 } 431 if keep == false { 432 continue 433 } 434 } 435 436 out := make(map[string]interface{}) 437 out["seq"] = evt.Seq 438 out["rev"] = evt.Rev 439 out["time"] = evt.Time 440 out["collection"] = collection 441 out["rkey"] = rkey 442 443 switch op.Action { 444 case "create", "update": 445 coll, rkey, err := syntax.ParseRepoPath(op.Path) 446 if err != nil { 447 return err 448 } 449 // read the record bytes from blocks, and verify CID 450 recBytes, rc, err := rr.GetRecordBytes(ctx, coll, rkey) 451 if err != nil { 452 logger.Error("reading record from event blocks (CAR)", "err", err) 453 break 454 } 455 if op.Cid == nil || lexutil.LexLink(*rc) != *op.Cid { 456 logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 457 break 458 } 459 460 out["action"] = op.Action 461 d, err := data.UnmarshalCBOR(recBytes) 462 if err != nil { 463 slog.Warn("failed to parse record CBOR") 464 continue 465 } 466 out["cid"] = op.Cid.String() 467 out["record"] = d 468 b, err := json.Marshal(out) 469 if err != nil { 470 return err 471 } 472 if !gfc.Quiet { 473 fmt.Println(string(b)) 474 } 475 case "delete": 476 out["action"] = "delete" 477 b, err := json.Marshal(out) 478 if err != nil { 479 return err 480 } 481 if !gfc.Quiet { 482 fmt.Println(string(b)) 483 } 484 default: 485 logger.Error("unexpected record op kind") 486 } 487 } 488 return nil 489}