1package main
2
3import (
4 "context"
5 "crypto/rand"
6 "fmt"
7 "log"
8 "log/slog"
9 "net"
10 "os"
11 "os/signal"
12 "path/filepath"
13 "strings"
14 "sync"
15 "syscall"
16 "time"
17
18 "net/http"
19 _ "net/http/pprof"
20
21 "github.com/bluesky-social/indigo/api/atproto"
22 comatproto "github.com/bluesky-social/indigo/api/atproto"
23 "github.com/bluesky-social/indigo/api/bsky"
24 "github.com/bluesky-social/indigo/carstore"
25 "github.com/bluesky-social/indigo/did"
26 "github.com/bluesky-social/indigo/events"
27 "github.com/bluesky-social/indigo/events/yolopersist"
28 "github.com/bluesky-social/indigo/indexer"
29 "github.com/bluesky-social/indigo/models"
30 "github.com/bluesky-social/indigo/plc"
31 petname "github.com/dustinkirkland/golang-petname"
32 "github.com/icrowley/fake"
33 "github.com/labstack/echo-contrib/pprof"
34 "github.com/urfave/cli/v2"
35 godid "github.com/whyrusleeping/go-did"
36 "golang.org/x/crypto/acme/autocert"
37 "golang.org/x/time/rate"
38
39 lexutil "github.com/bluesky-social/indigo/lex/util"
40 "github.com/bluesky-social/indigo/repomgr"
41 "github.com/bluesky-social/indigo/util"
42 "github.com/gorilla/websocket"
43 "github.com/labstack/echo/v4"
44 "github.com/labstack/echo/v4/middleware"
45 "github.com/prometheus/client_golang/prometheus"
46 "github.com/prometheus/client_golang/prometheus/promauto"
47 "github.com/prometheus/client_golang/prometheus/promhttp"
48 _ "go.uber.org/automaxprocs"
49 "gorm.io/driver/sqlite"
50 "gorm.io/gorm"
51
52 "github.com/carlmjohnson/versioninfo"
53 cbg "github.com/whyrusleeping/cbor-gen"
54)
55
56var eventsGeneratedCounter = promauto.NewCounter(prometheus.CounterOpts{
57 Name: "supercollider_events_generated_total",
58 Help: "The total number of events generated",
59})
60
61var eventsSentCounter = promauto.NewCounter(prometheus.CounterOpts{
62 Name: "supercollider_events_sent_total",
63 Help: "The total number of events sent",
64})
65
66type Server struct {
67 Events *events.EventManager
68 Dids []string
69 Host string
70 EnableSSL bool
71 Logger *slog.Logger
72 EventControl chan string
73 MultibaseKey string
74 RepoManager *repomgr.RepoManager
75
76 // Event Loop Parameters
77 TotalDesiredEvents int
78 MaxEventsPerSecond int
79
80 PlaybackFile string
81}
82
83func main() {
84 ctx := context.Background()
85 ctx, cancel := context.WithCancel(ctx)
86 defer cancel()
87
88 app := cli.App{
89 Name: "supercollider",
90 Usage: "atproto event noise-maker for Relay load testing",
91 Version: versioninfo.Short(),
92 }
93
94 app.Flags = []cli.Flag{
95 &cli.StringFlag{
96 Name: "hostname",
97 Usage: "hostname of this server (forward *.hostname DNS records to this server)",
98 Value: "supercollider.jazco.io",
99 EnvVars: []string{"SUPERCOLLIDER_HOST"},
100 },
101 &cli.BoolFlag{
102 Name: "use-ssl",
103 Usage: "listen on port 443 and use SSL (needs to be run as root and have external DNS setup)",
104 Value: false,
105 EnvVars: []string{"SUPERCOLLIDER_USE_SSL"},
106 },
107 &cli.IntFlag{
108 Name: "port",
109 Usage: "port for the HTTP(S) server to listen on (defaults to 80 if not using SSL, 443 if using SSL)",
110 EnvVars: []string{"SUPERCOLLIDER_PORT"},
111 },
112
113 &cli.StringFlag{
114 Name: "key-file",
115 Usage: "file to store the private key used to sign events",
116 Value: "key.raw",
117 EnvVars: []string{"KEY_FILE"},
118 },
119 }
120
121 app.Commands = []*cli.Command{
122 {
123 Name: "reload",
124 Usage: "reload events from a file and write them to an output file",
125 Action: Reload,
126 Flags: append([]cli.Flag{
127 &cli.IntFlag{
128 Name: "num-users",
129 Usage: "number of fake users to produce events for",
130 Value: 100,
131 EnvVars: []string{"NUM_USERS"},
132 },
133 &cli.IntFlag{
134 Name: "total-events",
135 Usage: "total number of events to generate",
136 Value: 1_000_000,
137 EnvVars: []string{"TOTAL_EVENTS"},
138 },
139 &cli.StringFlag{
140 Name: "output-file",
141 Usage: "output file for the generated events",
142 Value: "events_out.cbor",
143 EnvVars: []string{"OUTPUT_FILE"},
144 },
145 }, app.Flags...),
146 },
147 {
148 Name: "fire",
149 Usage: "fire events from a file over a websocket",
150 Action: Fire,
151 Flags: append([]cli.Flag{
152 &cli.IntFlag{
153 Name: "events-per-second",
154 Usage: "maximum number of events to generate per second",
155 Value: 300,
156 EnvVars: []string{"EVENTS_PER_SECOND"},
157 },
158 &cli.StringFlag{
159 Name: "input-file",
160 Usage: "input file for the generated events (if set, will read events from this file instead of generating them)",
161 Value: "events_in.cbor",
162 EnvVars: []string{"INPUT_FILE"},
163 },
164 }, app.Flags...),
165 },
166 }
167
168 err := app.Run(os.Args)
169 if err != nil {
170 log.Fatal(err)
171 }
172}
173
174func Reload(cctx *cli.Context) error {
175 ctx := cctx.Context
176 ctx, cancel := context.WithCancel(ctx)
177 defer cancel()
178
179 // Trap SIGINT to trigger a shutdown.
180 signals := make(chan os.Signal, 1)
181 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
182
183 go func() {
184 select {
185 case <-signals:
186 cancel()
187 fmt.Println("shutting down on signal")
188 // Give the server some time to shutdown gracefully, then exit.
189 time.Sleep(time.Second * 5)
190 os.Exit(0)
191 case <-ctx.Done():
192 fmt.Println("shutting down on context done")
193 }
194 }()
195
196 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
197 Level: slog.LevelInfo,
198 }))
199 defer func() {
200 logger.Info("main function teardown")
201 }()
202
203 logger = logger.With("source", "supercollider_main")
204
205 logger.Info("Starting Supercollider in Reload Mode")
206 logger.Info(fmt.Sprintf("Generating %d total events and writing them to %s",
207 cctx.Int("total-events"), cctx.String("output-file")))
208
209 em := events.NewEventManager(yolopersist.NewYoloPersister())
210
211 // Try to read the key from disk
212 keyBytes, err := os.ReadFile(cctx.String("key-file"))
213 if err != nil {
214 logger.Warn("failed to read key from disk, creating new key", "err", err.Error())
215 }
216
217 var privkey *godid.PrivKey
218 if len(keyBytes) == 0 {
219 privkey, err = godid.GeneratePrivKey(rand.Reader, godid.KeyTypeSecp256k1)
220 if err != nil {
221 log.Fatalf("failed to generate privkey: %+v\n", err)
222 }
223 rawKey, err := privkey.RawBytes()
224 if err != nil {
225 log.Fatalf("failed to serialize privkey: %+v\n", err)
226 }
227 err = os.WriteFile(cctx.String("key-file"), rawKey, 0644)
228 if err != nil {
229 log.Fatalf("failed to write privkey to disk: %+v\n", err)
230 }
231 } else {
232 privkey, err = godid.PrivKeyFromRawBytes(godid.KeyTypeSecp256k1, keyBytes)
233 if err != nil {
234 log.Fatalf("failed to parse privkey from disk: %+v\n", err)
235 }
236 }
237
238 // Configure the repomanager and keypair for our fake accounts
239 repoman, privkey, err := initSpeedyRepoMan(privkey)
240 if err != nil {
241 log.Fatalf("failed to init repo manager: %+v\n", err)
242 }
243
244 vMethod, err := godid.VerificationMethodFromKey(privkey.Public())
245 if err != nil {
246 log.Fatalf("failed to generate verification method: %+v\n", err)
247 }
248
249 // Initialize fake account DIDs
250 dids := []string{}
251 for i := 0; i < cctx.Int("num-users"); i++ {
252 did := fmt.Sprintf("did:web:%s.%s", petname.Generate(4, "-"), cctx.String("hostname"))
253 dids = append(dids, did)
254 }
255
256 // Instantiate Server
257 s := &Server{
258 Logger: logger,
259 EnableSSL: cctx.Bool("use-ssl"),
260 Host: cctx.String("hostname"),
261
262 RepoManager: repoman,
263 MultibaseKey: *vMethod.PublicKeyMultibase,
264 Dids: dids,
265
266 Events: em,
267 TotalDesiredEvents: cctx.Int("total-events"),
268 }
269
270 repoman.SetEventHandler(s.HandleRepoEvent, false)
271
272 // HTTP Server setup and Middleware Plumbing
273 e := echo.New()
274 e.AutoTLSManager.Cache = autocert.DirCache("/var/www/.cache")
275 pprof.Register(e)
276 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
277 Format: "method=${method}, ip=${remote_ip}, uri=${uri}, status=${status} latency=${latency_human} (ua=${user_agent})\n",
278 }))
279
280 e.GET("/", func(c echo.Context) error {
281 return c.HTML(http.StatusOK, `<h1>Supercollider is reloading...</h1>`)
282 })
283 e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
284
285 port := cctx.Int("port")
286 if port == 0 {
287 if cctx.Bool("use-ssl") {
288 port = 443
289 } else {
290 port = 80
291 }
292 }
293
294 wg := sync.WaitGroup{}
295 wg.Add(1)
296 // Start a loop to subscribe to events and write them to a file
297 go func() {
298 defer wg.Done()
299 outFile := cctx.String("output-file")
300 f, err := os.OpenFile(outFile, os.O_CREATE|os.O_WRONLY, 0644)
301 if err != nil {
302 log.Fatalf("failed to open output file: %+v\n", err)
303 }
304 defer f.Close()
305 since := int64(0)
306
307 evts, cancel, err := s.Events.Subscribe(ctx, "supercollider_file", func(evt *events.XRPCStreamEvent) bool {
308 return true
309 }, &since)
310 if err != nil {
311 log.Fatalf("failed to subscribe to events: %+v\n", err)
312 }
313 defer cancel()
314
315 logger.Info("writing events", "path", outFile)
316
317 header := events.EventHeader{Op: events.EvtKindMessage}
318 for {
319 select {
320 case <-ctx.Done():
321 logger.Info("shutting down file writer")
322 err = f.Sync()
323 if err != nil {
324 logger.Error("failed to sync file", "err", err)
325 }
326 logger.Info("file writer shutdown complete")
327 return
328 case evt := <-evts:
329 if evt.Error != nil {
330 logger.Error("error in event stream", "err", evt.Error)
331 continue
332 }
333 var obj lexutil.CBOR
334 switch {
335 case evt.Error != nil:
336 header.Op = events.EvtKindErrorFrame
337 obj = evt.Error
338 case evt.RepoCommit != nil:
339 header.MsgType = "#commit"
340 obj = evt.RepoCommit
341 case evt.RepoSync != nil:
342 header.MsgType = "#sync"
343 obj = evt.RepoSync
344 case evt.RepoInfo != nil:
345 header.MsgType = "#info"
346 obj = evt.RepoInfo
347 default:
348 logger.Error("unrecognized event kind")
349 continue
350 }
351
352 if err := header.MarshalCBOR(f); err != nil {
353 logger.Error("failed to write header", "err", err)
354 }
355
356 if err := obj.MarshalCBOR(f); err != nil {
357 logger.Error("failed to write event", "err", err)
358 }
359 }
360 }
361 }()
362
363 // Start the event generation loop
364 go func() {
365 time.Sleep(time.Second * 5)
366 s.EventGenerationLoop(ctx, cancel)
367 }()
368
369 listenAddress := fmt.Sprintf(":%d", port)
370 go func() {
371 if cctx.Bool("use-ssl") {
372 err = e.StartAutoTLS(listenAddress)
373 } else {
374 err = e.Start(listenAddress)
375 }
376 if err != nil {
377 logger.Error("failed to start server", "err", err)
378 }
379 }()
380 <-ctx.Done()
381 logger.Info("shutting down server...")
382 wg.Wait()
383 logger.Info("server shutdown complete")
384 return nil
385}
386
387func Fire(cctx *cli.Context) error {
388 ctx := cctx.Context
389 ctx, cancel := context.WithCancel(ctx)
390 defer cancel()
391
392 // Trap SIGINT to trigger a shutdown.
393 signals := make(chan os.Signal, 1)
394 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
395
396 go func() {
397 select {
398 case <-signals:
399 cancel()
400 fmt.Println("shutting down on signal")
401 // Give the server some time to shutdown gracefully, then exit.
402 time.Sleep(time.Second * 5)
403 os.Exit(0)
404 case <-ctx.Done():
405 fmt.Println("shutting down on context done")
406 }
407 }()
408
409 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
410 Level: slog.LevelInfo,
411 }))
412
413 defer func() {
414 logger.Info("main function teardown")
415 }()
416
417 logger = logger.With("source", "supercollider_main")
418 logger.Info("Starting Supercollider in Fire Mode")
419
420 // Try to read the key from disk
421 keyBytes, err := os.ReadFile(cctx.String("key-file"))
422 if err != nil {
423 logger.Warn("failed to read key from disk, creating new key", "err", err.Error())
424 }
425
426 var privkey *godid.PrivKey
427 if len(keyBytes) == 0 {
428 privkey, err = godid.GeneratePrivKey(rand.Reader, godid.KeyTypeSecp256k1)
429 if err != nil {
430 log.Fatalf("failed to generate privkey: %+v\n", err)
431 }
432 rawKey, err := privkey.RawBytes()
433 if err != nil {
434 log.Fatalf("failed to serialize privkey: %+v\n", err)
435 }
436 err = os.WriteFile(cctx.String("key-file"), rawKey, 0644)
437 if err != nil {
438 log.Fatalf("failed to write privkey to disk: %+v\n", err)
439 }
440 } else {
441 privkey, err = godid.PrivKeyFromRawBytes(godid.KeyTypeSecp256k1, keyBytes)
442 if err != nil {
443 log.Fatalf("failed to parse privkey from disk: %+v\n", err)
444 }
445 }
446
447 vMethod, err := godid.VerificationMethodFromKey(privkey.Public())
448 if err != nil {
449 log.Fatalf("failed to generate verification method: %+v\n", err)
450 }
451
452 // Instantiate Server
453 s := &Server{
454 Logger: logger,
455 EnableSSL: cctx.Bool("use-ssl"),
456 Host: cctx.String("hostname"),
457 MultibaseKey: *vMethod.PublicKeyMultibase,
458 MaxEventsPerSecond: cctx.Int("events-per-second"),
459 PlaybackFile: cctx.String("input-file"),
460 }
461
462 // HTTP Server setup and Middleware Plumbing
463 e := echo.New()
464 e.AutoTLSManager.Cache = autocert.DirCache("/var/www/.cache")
465 pprof.Register(e)
466 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
467 Format: "method=${method}, ip=${remote_ip}, uri=${uri}, status=${status} latency=${latency_human} (ua=${user_agent})\n",
468 }))
469
470 // Configure the HTTP Error Handler to support Websocket errors
471 e.HTTPErrorHandler = func(err error, ctx echo.Context) {
472 switch err := err.(type) {
473 case *echo.HTTPError:
474 if err2 := ctx.JSON(err.Code, map[string]any{
475 "error": err.Message,
476 }); err2 != nil {
477 logger.Error("Failed to write http error", "err", err2)
478 }
479 default:
480 sendHeader := true
481 if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" {
482 sendHeader = false
483 }
484
485 logger.Warn("HANDLER ERROR", "path", ctx.Path(), "err", err)
486
487 if sendHeader {
488 ctx.Response().WriteHeader(500)
489 }
490 }
491 }
492
493 e.GET("/", func(c echo.Context) error {
494 return c.HTML(http.StatusOK, `<h1>Supercollider is firing...</h1>`)
495 })
496
497 e.GET("/.well-known/did.json", s.HandleWellKnownDid)
498 e.GET("/.well-known/atproto-did", s.HandleAtprotoDid)
499 e.GET("/xrpc/com.atproto.server.describeServer", s.DescribeServerHandler)
500 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.HandleSubscribeRepos)
501 e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
502
503 port := cctx.Int("port")
504 if port == 0 {
505 if cctx.Bool("use-ssl") {
506 port = 443
507 } else {
508 port = 80
509 }
510 }
511
512 listenAddress := fmt.Sprintf(":%d", port)
513 go func() {
514 if cctx.Bool("use-ssl") {
515 err = e.StartAutoTLS(listenAddress)
516 } else {
517 err = e.Start(listenAddress)
518 }
519 if err != nil {
520 logger.Error("failed to start server", "err", err)
521 }
522 }()
523 <-ctx.Done()
524 logger.Info("shutting down server")
525 return nil
526}
527
528// Configure a gorm SQLite DB with some sensible defaults
529func setupDb(p string) (*gorm.DB, error) {
530 db, err := gorm.Open(sqlite.Open(p))
531 if err != nil {
532 return nil, fmt.Errorf("failed to open db: %w", err)
533 }
534
535 if err := db.Exec(`PRAGMA journal_mode=WAL;
536 pragma synchronous = normal;
537 pragma temp_store = memory;
538 pragma mmap_size = 30000000000;`,
539 ).Error; err != nil {
540 return nil, fmt.Errorf("failed to set pragma modes: %w", err)
541 }
542
543 return db, nil
544}
545
546// Stand up a Repo Manager with a Web DID Resolver
547func initSpeedyRepoMan(key *godid.PrivKey) (*repomgr.RepoManager, *godid.PrivKey, error) {
548 dir, err := os.MkdirTemp("", "supercollider")
549 if err != nil {
550 return nil, nil, err
551 }
552
553 cardb, err := setupDb("file::memory:?cache=shared")
554 if err != nil {
555 return nil, nil, err
556 }
557
558 cspath := filepath.Join(dir, "carstore")
559 if err := os.Mkdir(cspath, 0775); err != nil {
560 return nil, nil, err
561 }
562
563 cs, err := carstore.NewCarStore(cardb, []string{cspath})
564 if err != nil {
565 return nil, nil, err
566 }
567
568 mr := did.NewMultiResolver()
569 mr.AddHandler("web", &did.WebResolver{
570 Insecure: true,
571 })
572
573 cachedidr := plc.NewCachingDidResolver(mr, time.Minute*5, 1000)
574
575 kmgr := indexer.NewKeyManager(cachedidr, key)
576
577 repoman := repomgr.NewRepoManager(cs, kmgr)
578
579 return repoman, key, nil
580}
581
582// HandleRepoEvent is the callback for the RepoManager
583func (s *Server) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) {
584 outops := make([]*comatproto.SyncSubscribeRepos_RepoOp, 0, len(evt.Ops))
585 for _, op := range evt.Ops {
586 link := (*lexutil.LexLink)(op.RecCid)
587 outops = append(outops, &comatproto.SyncSubscribeRepos_RepoOp{
588 Path: op.Collection + "/" + op.Rkey,
589 Action: string(op.Kind),
590 Cid: link,
591 })
592 }
593
594 if err := s.Events.AddEvent(ctx, &events.XRPCStreamEvent{
595 RepoCommit: &comatproto.SyncSubscribeRepos_Commit{
596 Repo: s.Dids[evt.User-1],
597 Blocks: evt.RepoSlice,
598 Commit: lexutil.LexLink(evt.NewRoot),
599 Time: time.Now().Format(util.ISO8601),
600 Ops: outops,
601 TooBig: false,
602 },
603 PrivUid: evt.User,
604 }); err != nil {
605 s.Logger.Error("failed to add event", "err", err)
606 }
607}
608
609// EventGenerationLoop is the main loop for generating events
610func (s *Server) EventGenerationLoop(ctx context.Context, cancel context.CancelFunc) {
611 defer cancel()
612 s.Logger.Info(fmt.Sprintf("starting event generation for %d events", s.TotalDesiredEvents))
613
614 s.Logger.Info(fmt.Sprintf("initializing %d fake users", len(s.Dids)))
615 for i, did := range s.Dids {
616 uid := models.Uid(i + 1)
617 if err := s.RepoManager.InitNewActor(ctx, uid, strings.TrimPrefix(did, "did:web:"), did, "catdog", "", ""); err != nil {
618 log.Fatalf("failed to init actor: %+v\n", err)
619 }
620 }
621
622 s.Logger.Info("generating events", "count", s.TotalDesiredEvents)
623
624 for i := 0; i < s.TotalDesiredEvents; i++ {
625 text := fake.SentencesN(3)
626 // Trim to 300 chars
627 if len(text) > 300 {
628 text = text[:300]
629 }
630 _, _, err := s.RepoManager.CreateRecord(ctx, models.Uid(i%len(s.Dids)+1), "app.bsky.feed.post", &bsky.FeedPost{
631 CreatedAt: time.Now().Format(util.ISO8601),
632 Text: text,
633 })
634 if err != nil {
635 s.Logger.Error("failed to create record", "err", err)
636 } else {
637 eventsGeneratedCounter.Inc()
638 }
639 select {
640 case <-ctx.Done():
641 s.Logger.Info("shutting down event generation loop on context done")
642 return
643 default:
644 }
645 }
646
647 s.Logger.Info("event generation complete, shutting down")
648 return
649}
650
651// ATProto Handlers for DID Web
652
653// HandleAtprotoDid handles reverse-lookups (handle -> DID)
654func (s *Server) HandleAtprotoDid(c echo.Context) error {
655 return c.String(http.StatusOK, "did:web:"+c.Request().Host)
656}
657
658// HandleWellKnownDid handles DID document lookups (DID -> identity)
659func (s *Server) HandleWellKnownDid(c echo.Context) error {
660 return c.JSON(http.StatusOK, map[string]any{
661 "@context": []string{"https://www.w3.org/ns/did/v1"},
662 "id": "did:web:" + c.Request().Host,
663 "alsoKnownAs": []string{
664 "at://" + c.Request().Host,
665 },
666 "verificationMethod": []map[string]any{
667 {
668 "id": "#atproto",
669 "type": godid.KeyTypeSecp256k1,
670 "controller": "did:web:" + s.Host,
671 "publicKeyMultibase": s.MultibaseKey,
672 },
673 },
674 "service": []map[string]any{
675 {
676 "id": "#atproto_pds",
677 "type": "AtprotoPersonalDataServer",
678 "serviceEndpoint": "http://" + s.Host,
679 },
680 },
681 })
682}
683
684// DescribeServerHandler identifies the server as a PDS (even though it isn't)
685func (s *Server) DescribeServerHandler(c echo.Context) error {
686 invcode := false
687 resp := &atproto.ServerDescribeServer_Output{
688 InviteCodeRequired: &invcode,
689 AvailableUserDomains: []string{},
690 Links: &atproto.ServerDescribeServer_Links{},
691 }
692 return c.JSON(http.StatusOK, resp)
693}
694
695// HandleSubscribeRepos opens and manages a websocket connection for subscribing to repo events
696func (s *Server) HandleSubscribeRepos(c echo.Context) error {
697 s.Logger.Info("new repo subscription", "remote", c.Request().RemoteAddr)
698 conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10)
699 if err != nil {
700 return err
701 }
702 defer conn.Close()
703
704 ctx := c.Request().Context()
705
706 limiter := rate.NewLimiter(rate.Limit(s.MaxEventsPerSecond), 10)
707
708 f, err := os.Open(s.PlaybackFile)
709 if err != nil {
710 s.Logger.Error("failed to open playback file", "err", err)
711 return err
712 }
713 defer f.Close()
714
715 // Set a ping handler
716 conn.SetPingHandler(func(message string) error {
717 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60))
718 if err == websocket.ErrCloseSent {
719 return nil
720 } else if e, ok := err.(net.Error); ok && e.Temporary() {
721 return nil
722 }
723 return err
724 })
725
726 // Start a goroutine to read messages from the client and discard them.
727 go func() {
728 for {
729 _, _, err := conn.ReadMessage()
730 if err != nil {
731 return
732 }
733 }
734 }()
735
736 header := cbg.Deferred{}
737 obj := cbg.Deferred{}
738 for {
739 wc, err := conn.NextWriter(websocket.BinaryMessage)
740 if err != nil {
741 return err
742 }
743
744 limiter.Wait(ctx)
745
746 if err := header.UnmarshalCBOR(f); err != nil {
747 return fmt.Errorf("failed to read header: %w", err)
748 }
749 if err := obj.UnmarshalCBOR(f); err != nil {
750 return fmt.Errorf("failed to read event: %w", err)
751 }
752 if err := header.MarshalCBOR(wc); err != nil {
753 return fmt.Errorf("failed to write header: %w", err)
754 }
755 if err := obj.MarshalCBOR(wc); err != nil {
756 return fmt.Errorf("failed to write event: %w", err)
757 }
758 if err := wc.Close(); err != nil {
759 return fmt.Errorf("failed to flush-close our event write: %w", err)
760 }
761 eventsSentCounter.Inc()
762 }
763}