package server import ( "bytes" "context" "net/http" "time" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/lex/util" ) func (s *Server) emmitEvents(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() logger := s.logger.With("component", "event-emmiter") ident := "self" var since *int64 // TODO: track since evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) if err != nil { return err } defer evtManCancel() header := events.EventHeader{Op: events.EvtKindMessage} for evt := range evts { func() { if ctx.Err() != nil { logger.Error("context error", "err", err) return } var obj util.CBOR switch { case evt.Error != nil: header.Op = events.EvtKindErrorFrame obj = evt.Error case evt.RepoCommit != nil: header.MsgType = "#commit" obj = evt.RepoCommit case evt.RepoIdentity != nil: header.MsgType = "#identity" obj = evt.RepoIdentity case evt.RepoAccount != nil: header.MsgType = "#account" obj = evt.RepoAccount case evt.RepoInfo != nil: header.MsgType = "#info" obj = evt.RepoInfo default: logger.Warn("unrecognized event kind") return } buf := new(bytes.Buffer) if err := header.MarshalCBOR(buf); err != nil { logger.Error("failed to marshal header to buffer", "err", err) return } if err := obj.MarshalCBOR(buf); err != nil { logger.Error("failed to marshal event to buffer", "err", err) return } // TODO: use a HTTP client here not the default _, err := http.Post(s.config.SubscribeReposServiceURL, "", buf) if err != nil { logger.Error("posting to web server", "error", err) return } }() } // we should tell the relay to request a new crawl at this point if we got disconnected // use a new context since the old one might be cancelled at this point go func() { retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) defer retryCancel() if err := s.requestCrawl(retryCtx); err != nil { logger.Error("error requesting crawls", "err", err) } }() return nil }