porting all github actions from bluesky-social/indigo to tangled CI
at main 21 kB view raw
1package main 2 3import ( 4 "context" 5 "crypto/rand" 6 "fmt" 7 "log" 8 "log/slog" 9 "net" 10 "os" 11 "os/signal" 12 "path/filepath" 13 "strings" 14 "sync" 15 "syscall" 16 "time" 17 18 "net/http" 19 _ "net/http/pprof" 20 21 "github.com/bluesky-social/indigo/api/atproto" 22 comatproto "github.com/bluesky-social/indigo/api/atproto" 23 "github.com/bluesky-social/indigo/api/bsky" 24 "github.com/bluesky-social/indigo/carstore" 25 "github.com/bluesky-social/indigo/did" 26 "github.com/bluesky-social/indigo/events" 27 "github.com/bluesky-social/indigo/events/yolopersist" 28 "github.com/bluesky-social/indigo/indexer" 29 "github.com/bluesky-social/indigo/models" 30 "github.com/bluesky-social/indigo/plc" 31 petname "github.com/dustinkirkland/golang-petname" 32 "github.com/icrowley/fake" 33 "github.com/labstack/echo-contrib/pprof" 34 "github.com/urfave/cli/v2" 35 godid "github.com/whyrusleeping/go-did" 36 "golang.org/x/crypto/acme/autocert" 37 "golang.org/x/time/rate" 38 39 lexutil "github.com/bluesky-social/indigo/lex/util" 40 "github.com/bluesky-social/indigo/repomgr" 41 "github.com/bluesky-social/indigo/util" 42 "github.com/gorilla/websocket" 43 "github.com/labstack/echo/v4" 44 "github.com/labstack/echo/v4/middleware" 45 "github.com/prometheus/client_golang/prometheus" 46 "github.com/prometheus/client_golang/prometheus/promauto" 47 "github.com/prometheus/client_golang/prometheus/promhttp" 48 _ "go.uber.org/automaxprocs" 49 "gorm.io/driver/sqlite" 50 "gorm.io/gorm" 51 52 "github.com/carlmjohnson/versioninfo" 53 cbg "github.com/whyrusleeping/cbor-gen" 54) 55 56var eventsGeneratedCounter = promauto.NewCounter(prometheus.CounterOpts{ 57 Name: "supercollider_events_generated_total", 58 Help: "The total number of events generated", 59}) 60 61var eventsSentCounter = promauto.NewCounter(prometheus.CounterOpts{ 62 Name: "supercollider_events_sent_total", 63 Help: "The total number of events sent", 64}) 65 66type Server struct { 67 Events *events.EventManager 68 Dids []string 69 Host string 70 EnableSSL bool 71 Logger *slog.Logger 72 EventControl chan string 73 MultibaseKey string 74 RepoManager *repomgr.RepoManager 75 76 // Event Loop Parameters 77 TotalDesiredEvents int 78 MaxEventsPerSecond int 79 80 PlaybackFile string 81} 82 83func main() { 84 ctx := context.Background() 85 ctx, cancel := context.WithCancel(ctx) 86 defer cancel() 87 88 app := cli.App{ 89 Name: "supercollider", 90 Usage: "atproto event noise-maker for Relay load testing", 91 Version: versioninfo.Short(), 92 } 93 94 app.Flags = []cli.Flag{ 95 &cli.StringFlag{ 96 Name: "hostname", 97 Usage: "hostname of this server (forward *.hostname DNS records to this server)", 98 Value: "supercollider.jazco.io", 99 EnvVars: []string{"SUPERCOLLIDER_HOST"}, 100 }, 101 &cli.BoolFlag{ 102 Name: "use-ssl", 103 Usage: "listen on port 443 and use SSL (needs to be run as root and have external DNS setup)", 104 Value: false, 105 EnvVars: []string{"SUPERCOLLIDER_USE_SSL"}, 106 }, 107 &cli.IntFlag{ 108 Name: "port", 109 Usage: "port for the HTTP(S) server to listen on (defaults to 80 if not using SSL, 443 if using SSL)", 110 EnvVars: []string{"SUPERCOLLIDER_PORT"}, 111 }, 112 113 &cli.StringFlag{ 114 Name: "key-file", 115 Usage: "file to store the private key used to sign events", 116 Value: "key.raw", 117 EnvVars: []string{"KEY_FILE"}, 118 }, 119 } 120 121 app.Commands = []*cli.Command{ 122 { 123 Name: "reload", 124 Usage: "reload events from a file and write them to an output file", 125 Action: Reload, 126 Flags: append([]cli.Flag{ 127 &cli.IntFlag{ 128 Name: "num-users", 129 Usage: "number of fake users to produce events for", 130 Value: 100, 131 EnvVars: []string{"NUM_USERS"}, 132 }, 133 &cli.IntFlag{ 134 Name: "total-events", 135 Usage: "total number of events to generate", 136 Value: 1_000_000, 137 EnvVars: []string{"TOTAL_EVENTS"}, 138 }, 139 &cli.StringFlag{ 140 Name: "output-file", 141 Usage: "output file for the generated events", 142 Value: "events_out.cbor", 143 EnvVars: []string{"OUTPUT_FILE"}, 144 }, 145 }, app.Flags...), 146 }, 147 { 148 Name: "fire", 149 Usage: "fire events from a file over a websocket", 150 Action: Fire, 151 Flags: append([]cli.Flag{ 152 &cli.IntFlag{ 153 Name: "events-per-second", 154 Usage: "maximum number of events to generate per second", 155 Value: 300, 156 EnvVars: []string{"EVENTS_PER_SECOND"}, 157 }, 158 &cli.StringFlag{ 159 Name: "input-file", 160 Usage: "input file for the generated events (if set, will read events from this file instead of generating them)", 161 Value: "events_in.cbor", 162 EnvVars: []string{"INPUT_FILE"}, 163 }, 164 }, app.Flags...), 165 }, 166 } 167 168 err := app.Run(os.Args) 169 if err != nil { 170 log.Fatal(err) 171 } 172} 173 174func Reload(cctx *cli.Context) error { 175 ctx := cctx.Context 176 ctx, cancel := context.WithCancel(ctx) 177 defer cancel() 178 179 // Trap SIGINT to trigger a shutdown. 180 signals := make(chan os.Signal, 1) 181 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 182 183 go func() { 184 select { 185 case <-signals: 186 cancel() 187 fmt.Println("shutting down on signal") 188 // Give the server some time to shutdown gracefully, then exit. 189 time.Sleep(time.Second * 5) 190 os.Exit(0) 191 case <-ctx.Done(): 192 fmt.Println("shutting down on context done") 193 } 194 }() 195 196 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 197 Level: slog.LevelInfo, 198 })) 199 defer func() { 200 logger.Info("main function teardown") 201 }() 202 203 logger = logger.With("source", "supercollider_main") 204 205 logger.Info("Starting Supercollider in Reload Mode") 206 logger.Info(fmt.Sprintf("Generating %d total events and writing them to %s", 207 cctx.Int("total-events"), cctx.String("output-file"))) 208 209 em := events.NewEventManager(yolopersist.NewYoloPersister()) 210 211 // Try to read the key from disk 212 keyBytes, err := os.ReadFile(cctx.String("key-file")) 213 if err != nil { 214 logger.Warn("failed to read key from disk, creating new key", "err", err.Error()) 215 } 216 217 var privkey *godid.PrivKey 218 if len(keyBytes) == 0 { 219 privkey, err = godid.GeneratePrivKey(rand.Reader, godid.KeyTypeSecp256k1) 220 if err != nil { 221 log.Fatalf("failed to generate privkey: %+v\n", err) 222 } 223 rawKey, err := privkey.RawBytes() 224 if err != nil { 225 log.Fatalf("failed to serialize privkey: %+v\n", err) 226 } 227 err = os.WriteFile(cctx.String("key-file"), rawKey, 0644) 228 if err != nil { 229 log.Fatalf("failed to write privkey to disk: %+v\n", err) 230 } 231 } else { 232 privkey, err = godid.PrivKeyFromRawBytes(godid.KeyTypeSecp256k1, keyBytes) 233 if err != nil { 234 log.Fatalf("failed to parse privkey from disk: %+v\n", err) 235 } 236 } 237 238 // Configure the repomanager and keypair for our fake accounts 239 repoman, privkey, err := initSpeedyRepoMan(privkey) 240 if err != nil { 241 log.Fatalf("failed to init repo manager: %+v\n", err) 242 } 243 244 vMethod, err := godid.VerificationMethodFromKey(privkey.Public()) 245 if err != nil { 246 log.Fatalf("failed to generate verification method: %+v\n", err) 247 } 248 249 // Initialize fake account DIDs 250 dids := []string{} 251 for i := 0; i < cctx.Int("num-users"); i++ { 252 did := fmt.Sprintf("did:web:%s.%s", petname.Generate(4, "-"), cctx.String("hostname")) 253 dids = append(dids, did) 254 } 255 256 // Instantiate Server 257 s := &Server{ 258 Logger: logger, 259 EnableSSL: cctx.Bool("use-ssl"), 260 Host: cctx.String("hostname"), 261 262 RepoManager: repoman, 263 MultibaseKey: *vMethod.PublicKeyMultibase, 264 Dids: dids, 265 266 Events: em, 267 TotalDesiredEvents: cctx.Int("total-events"), 268 } 269 270 repoman.SetEventHandler(s.HandleRepoEvent, false) 271 272 // HTTP Server setup and Middleware Plumbing 273 e := echo.New() 274 e.AutoTLSManager.Cache = autocert.DirCache("/var/www/.cache") 275 pprof.Register(e) 276 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 277 Format: "method=${method}, ip=${remote_ip}, uri=${uri}, status=${status} latency=${latency_human} (ua=${user_agent})\n", 278 })) 279 280 e.GET("/", func(c echo.Context) error { 281 return c.HTML(http.StatusOK, `<h1>Supercollider is reloading...</h1>`) 282 }) 283 e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 284 285 port := cctx.Int("port") 286 if port == 0 { 287 if cctx.Bool("use-ssl") { 288 port = 443 289 } else { 290 port = 80 291 } 292 } 293 294 wg := sync.WaitGroup{} 295 wg.Add(1) 296 // Start a loop to subscribe to events and write them to a file 297 go func() { 298 defer wg.Done() 299 outFile := cctx.String("output-file") 300 f, err := os.OpenFile(outFile, os.O_CREATE|os.O_WRONLY, 0644) 301 if err != nil { 302 log.Fatalf("failed to open output file: %+v\n", err) 303 } 304 defer f.Close() 305 since := int64(0) 306 307 evts, cancel, err := s.Events.Subscribe(ctx, "supercollider_file", func(evt *events.XRPCStreamEvent) bool { 308 return true 309 }, &since) 310 if err != nil { 311 log.Fatalf("failed to subscribe to events: %+v\n", err) 312 } 313 defer cancel() 314 315 logger.Info("writing events", "path", outFile) 316 317 header := events.EventHeader{Op: events.EvtKindMessage} 318 for { 319 select { 320 case <-ctx.Done(): 321 logger.Info("shutting down file writer") 322 err = f.Sync() 323 if err != nil { 324 logger.Error("failed to sync file", "err", err) 325 } 326 logger.Info("file writer shutdown complete") 327 return 328 case evt := <-evts: 329 if evt.Error != nil { 330 logger.Error("error in event stream", "err", evt.Error) 331 continue 332 } 333 var obj lexutil.CBOR 334 switch { 335 case evt.Error != nil: 336 header.Op = events.EvtKindErrorFrame 337 obj = evt.Error 338 case evt.RepoCommit != nil: 339 header.MsgType = "#commit" 340 obj = evt.RepoCommit 341 case evt.RepoSync != nil: 342 header.MsgType = "#sync" 343 obj = evt.RepoSync 344 case evt.RepoInfo != nil: 345 header.MsgType = "#info" 346 obj = evt.RepoInfo 347 default: 348 logger.Error("unrecognized event kind") 349 continue 350 } 351 352 if err := header.MarshalCBOR(f); err != nil { 353 logger.Error("failed to write header", "err", err) 354 } 355 356 if err := obj.MarshalCBOR(f); err != nil { 357 logger.Error("failed to write event", "err", err) 358 } 359 } 360 } 361 }() 362 363 // Start the event generation loop 364 go func() { 365 time.Sleep(time.Second * 5) 366 s.EventGenerationLoop(ctx, cancel) 367 }() 368 369 listenAddress := fmt.Sprintf(":%d", port) 370 go func() { 371 if cctx.Bool("use-ssl") { 372 err = e.StartAutoTLS(listenAddress) 373 } else { 374 err = e.Start(listenAddress) 375 } 376 if err != nil { 377 logger.Error("failed to start server", "err", err) 378 } 379 }() 380 <-ctx.Done() 381 logger.Info("shutting down server...") 382 wg.Wait() 383 logger.Info("server shutdown complete") 384 return nil 385} 386 387func Fire(cctx *cli.Context) error { 388 ctx := cctx.Context 389 ctx, cancel := context.WithCancel(ctx) 390 defer cancel() 391 392 // Trap SIGINT to trigger a shutdown. 393 signals := make(chan os.Signal, 1) 394 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 395 396 go func() { 397 select { 398 case <-signals: 399 cancel() 400 fmt.Println("shutting down on signal") 401 // Give the server some time to shutdown gracefully, then exit. 402 time.Sleep(time.Second * 5) 403 os.Exit(0) 404 case <-ctx.Done(): 405 fmt.Println("shutting down on context done") 406 } 407 }() 408 409 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 410 Level: slog.LevelInfo, 411 })) 412 413 defer func() { 414 logger.Info("main function teardown") 415 }() 416 417 logger = logger.With("source", "supercollider_main") 418 logger.Info("Starting Supercollider in Fire Mode") 419 420 // Try to read the key from disk 421 keyBytes, err := os.ReadFile(cctx.String("key-file")) 422 if err != nil { 423 logger.Warn("failed to read key from disk, creating new key", "err", err.Error()) 424 } 425 426 var privkey *godid.PrivKey 427 if len(keyBytes) == 0 { 428 privkey, err = godid.GeneratePrivKey(rand.Reader, godid.KeyTypeSecp256k1) 429 if err != nil { 430 log.Fatalf("failed to generate privkey: %+v\n", err) 431 } 432 rawKey, err := privkey.RawBytes() 433 if err != nil { 434 log.Fatalf("failed to serialize privkey: %+v\n", err) 435 } 436 err = os.WriteFile(cctx.String("key-file"), rawKey, 0644) 437 if err != nil { 438 log.Fatalf("failed to write privkey to disk: %+v\n", err) 439 } 440 } else { 441 privkey, err = godid.PrivKeyFromRawBytes(godid.KeyTypeSecp256k1, keyBytes) 442 if err != nil { 443 log.Fatalf("failed to parse privkey from disk: %+v\n", err) 444 } 445 } 446 447 vMethod, err := godid.VerificationMethodFromKey(privkey.Public()) 448 if err != nil { 449 log.Fatalf("failed to generate verification method: %+v\n", err) 450 } 451 452 // Instantiate Server 453 s := &Server{ 454 Logger: logger, 455 EnableSSL: cctx.Bool("use-ssl"), 456 Host: cctx.String("hostname"), 457 MultibaseKey: *vMethod.PublicKeyMultibase, 458 MaxEventsPerSecond: cctx.Int("events-per-second"), 459 PlaybackFile: cctx.String("input-file"), 460 } 461 462 // HTTP Server setup and Middleware Plumbing 463 e := echo.New() 464 e.AutoTLSManager.Cache = autocert.DirCache("/var/www/.cache") 465 pprof.Register(e) 466 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 467 Format: "method=${method}, ip=${remote_ip}, uri=${uri}, status=${status} latency=${latency_human} (ua=${user_agent})\n", 468 })) 469 470 // Configure the HTTP Error Handler to support Websocket errors 471 e.HTTPErrorHandler = func(err error, ctx echo.Context) { 472 switch err := err.(type) { 473 case *echo.HTTPError: 474 if err2 := ctx.JSON(err.Code, map[string]any{ 475 "error": err.Message, 476 }); err2 != nil { 477 logger.Error("Failed to write http error", "err", err2) 478 } 479 default: 480 sendHeader := true 481 if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { 482 sendHeader = false 483 } 484 485 logger.Warn("HANDLER ERROR", "path", ctx.Path(), "err", err) 486 487 if sendHeader { 488 ctx.Response().WriteHeader(500) 489 } 490 } 491 } 492 493 e.GET("/", func(c echo.Context) error { 494 return c.HTML(http.StatusOK, `<h1>Supercollider is firing...</h1>`) 495 }) 496 497 e.GET("/.well-known/did.json", s.HandleWellKnownDid) 498 e.GET("/.well-known/atproto-did", s.HandleAtprotoDid) 499 e.GET("/xrpc/com.atproto.server.describeServer", s.DescribeServerHandler) 500 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.HandleSubscribeRepos) 501 e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 502 503 port := cctx.Int("port") 504 if port == 0 { 505 if cctx.Bool("use-ssl") { 506 port = 443 507 } else { 508 port = 80 509 } 510 } 511 512 listenAddress := fmt.Sprintf(":%d", port) 513 go func() { 514 if cctx.Bool("use-ssl") { 515 err = e.StartAutoTLS(listenAddress) 516 } else { 517 err = e.Start(listenAddress) 518 } 519 if err != nil { 520 logger.Error("failed to start server", "err", err) 521 } 522 }() 523 <-ctx.Done() 524 logger.Info("shutting down server") 525 return nil 526} 527 528// Configure a gorm SQLite DB with some sensible defaults 529func setupDb(p string) (*gorm.DB, error) { 530 db, err := gorm.Open(sqlite.Open(p)) 531 if err != nil { 532 return nil, fmt.Errorf("failed to open db: %w", err) 533 } 534 535 if err := db.Exec(`PRAGMA journal_mode=WAL; 536 pragma synchronous = normal; 537 pragma temp_store = memory; 538 pragma mmap_size = 30000000000;`, 539 ).Error; err != nil { 540 return nil, fmt.Errorf("failed to set pragma modes: %w", err) 541 } 542 543 return db, nil 544} 545 546// Stand up a Repo Manager with a Web DID Resolver 547func initSpeedyRepoMan(key *godid.PrivKey) (*repomgr.RepoManager, *godid.PrivKey, error) { 548 dir, err := os.MkdirTemp("", "supercollider") 549 if err != nil { 550 return nil, nil, err 551 } 552 553 cardb, err := setupDb("file::memory:?cache=shared") 554 if err != nil { 555 return nil, nil, err 556 } 557 558 cspath := filepath.Join(dir, "carstore") 559 if err := os.Mkdir(cspath, 0775); err != nil { 560 return nil, nil, err 561 } 562 563 cs, err := carstore.NewCarStore(cardb, []string{cspath}) 564 if err != nil { 565 return nil, nil, err 566 } 567 568 mr := did.NewMultiResolver() 569 mr.AddHandler("web", &did.WebResolver{ 570 Insecure: true, 571 }) 572 573 cachedidr := plc.NewCachingDidResolver(mr, time.Minute*5, 1000) 574 575 kmgr := indexer.NewKeyManager(cachedidr, key) 576 577 repoman := repomgr.NewRepoManager(cs, kmgr) 578 579 return repoman, key, nil 580} 581 582// HandleRepoEvent is the callback for the RepoManager 583func (s *Server) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) { 584 outops := make([]*comatproto.SyncSubscribeRepos_RepoOp, 0, len(evt.Ops)) 585 for _, op := range evt.Ops { 586 link := (*lexutil.LexLink)(op.RecCid) 587 outops = append(outops, &comatproto.SyncSubscribeRepos_RepoOp{ 588 Path: op.Collection + "/" + op.Rkey, 589 Action: string(op.Kind), 590 Cid: link, 591 }) 592 } 593 594 if err := s.Events.AddEvent(ctx, &events.XRPCStreamEvent{ 595 RepoCommit: &comatproto.SyncSubscribeRepos_Commit{ 596 Repo: s.Dids[evt.User-1], 597 Blocks: evt.RepoSlice, 598 Commit: lexutil.LexLink(evt.NewRoot), 599 Time: time.Now().Format(util.ISO8601), 600 Ops: outops, 601 TooBig: false, 602 }, 603 PrivUid: evt.User, 604 }); err != nil { 605 s.Logger.Error("failed to add event", "err", err) 606 } 607} 608 609// EventGenerationLoop is the main loop for generating events 610func (s *Server) EventGenerationLoop(ctx context.Context, cancel context.CancelFunc) { 611 defer cancel() 612 s.Logger.Info(fmt.Sprintf("starting event generation for %d events", s.TotalDesiredEvents)) 613 614 s.Logger.Info(fmt.Sprintf("initializing %d fake users", len(s.Dids))) 615 for i, did := range s.Dids { 616 uid := models.Uid(i + 1) 617 if err := s.RepoManager.InitNewActor(ctx, uid, strings.TrimPrefix(did, "did:web:"), did, "catdog", "", ""); err != nil { 618 log.Fatalf("failed to init actor: %+v\n", err) 619 } 620 } 621 622 s.Logger.Info("generating events", "count", s.TotalDesiredEvents) 623 624 for i := 0; i < s.TotalDesiredEvents; i++ { 625 text := fake.SentencesN(3) 626 // Trim to 300 chars 627 if len(text) > 300 { 628 text = text[:300] 629 } 630 _, _, err := s.RepoManager.CreateRecord(ctx, models.Uid(i%len(s.Dids)+1), "app.bsky.feed.post", &bsky.FeedPost{ 631 CreatedAt: time.Now().Format(util.ISO8601), 632 Text: text, 633 }) 634 if err != nil { 635 s.Logger.Error("failed to create record", "err", err) 636 } else { 637 eventsGeneratedCounter.Inc() 638 } 639 select { 640 case <-ctx.Done(): 641 s.Logger.Info("shutting down event generation loop on context done") 642 return 643 default: 644 } 645 } 646 647 s.Logger.Info("event generation complete, shutting down") 648 return 649} 650 651// ATProto Handlers for DID Web 652 653// HandleAtprotoDid handles reverse-lookups (handle -> DID) 654func (s *Server) HandleAtprotoDid(c echo.Context) error { 655 return c.String(http.StatusOK, "did:web:"+c.Request().Host) 656} 657 658// HandleWellKnownDid handles DID document lookups (DID -> identity) 659func (s *Server) HandleWellKnownDid(c echo.Context) error { 660 return c.JSON(http.StatusOK, map[string]any{ 661 "@context": []string{"https://www.w3.org/ns/did/v1"}, 662 "id": "did:web:" + c.Request().Host, 663 "alsoKnownAs": []string{ 664 "at://" + c.Request().Host, 665 }, 666 "verificationMethod": []map[string]any{ 667 { 668 "id": "#atproto", 669 "type": godid.KeyTypeSecp256k1, 670 "controller": "did:web:" + s.Host, 671 "publicKeyMultibase": s.MultibaseKey, 672 }, 673 }, 674 "service": []map[string]any{ 675 { 676 "id": "#atproto_pds", 677 "type": "AtprotoPersonalDataServer", 678 "serviceEndpoint": "http://" + s.Host, 679 }, 680 }, 681 }) 682} 683 684// DescribeServerHandler identifies the server as a PDS (even though it isn't) 685func (s *Server) DescribeServerHandler(c echo.Context) error { 686 invcode := false 687 resp := &atproto.ServerDescribeServer_Output{ 688 InviteCodeRequired: &invcode, 689 AvailableUserDomains: []string{}, 690 Links: &atproto.ServerDescribeServer_Links{}, 691 } 692 return c.JSON(http.StatusOK, resp) 693} 694 695// HandleSubscribeRepos opens and manages a websocket connection for subscribing to repo events 696func (s *Server) HandleSubscribeRepos(c echo.Context) error { 697 s.Logger.Info("new repo subscription", "remote", c.Request().RemoteAddr) 698 conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 699 if err != nil { 700 return err 701 } 702 defer conn.Close() 703 704 ctx := c.Request().Context() 705 706 limiter := rate.NewLimiter(rate.Limit(s.MaxEventsPerSecond), 10) 707 708 f, err := os.Open(s.PlaybackFile) 709 if err != nil { 710 s.Logger.Error("failed to open playback file", "err", err) 711 return err 712 } 713 defer f.Close() 714 715 // Set a ping handler 716 conn.SetPingHandler(func(message string) error { 717 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 718 if err == websocket.ErrCloseSent { 719 return nil 720 } else if e, ok := err.(net.Error); ok && e.Temporary() { 721 return nil 722 } 723 return err 724 }) 725 726 // Start a goroutine to read messages from the client and discard them. 727 go func() { 728 for { 729 _, _, err := conn.ReadMessage() 730 if err != nil { 731 return 732 } 733 } 734 }() 735 736 header := cbg.Deferred{} 737 obj := cbg.Deferred{} 738 for { 739 wc, err := conn.NextWriter(websocket.BinaryMessage) 740 if err != nil { 741 return err 742 } 743 744 limiter.Wait(ctx) 745 746 if err := header.UnmarshalCBOR(f); err != nil { 747 return fmt.Errorf("failed to read header: %w", err) 748 } 749 if err := obj.UnmarshalCBOR(f); err != nil { 750 return fmt.Errorf("failed to read event: %w", err) 751 } 752 if err := header.MarshalCBOR(wc); err != nil { 753 return fmt.Errorf("failed to write header: %w", err) 754 } 755 if err := obj.MarshalCBOR(wc); err != nil { 756 return fmt.Errorf("failed to write event: %w", err) 757 } 758 if err := wc.Close(); err != nil { 759 return fmt.Errorf("failed to flush-close our event write: %w", err) 760 } 761 eventsSentCounter.Inc() 762 } 763}