An atproto PDS written in Go
at push-based 92 lines 2.2 kB view raw
1package server 2 3import ( 4 "bytes" 5 "context" 6 "net/http" 7 "time" 8 9 "github.com/bluesky-social/indigo/events" 10 "github.com/bluesky-social/indigo/lex/util" 11) 12 13func (s *Server) emmitEvents(ctx context.Context) error { 14 ctx, cancel := context.WithCancel(ctx) 15 defer cancel() 16 17 logger := s.logger.With("component", "event-emmiter") 18 ident := "self" 19 var since *int64 20 // TODO: track since 21 22 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 23 return true 24 }, since) 25 if err != nil { 26 return err 27 } 28 defer evtManCancel() 29 30 header := events.EventHeader{Op: events.EvtKindMessage} 31 for evt := range evts { 32 func() { 33 if ctx.Err() != nil { 34 logger.Error("context error", "err", err) 35 return 36 } 37 38 var obj util.CBOR 39 switch { 40 case evt.Error != nil: 41 header.Op = events.EvtKindErrorFrame 42 obj = evt.Error 43 case evt.RepoCommit != nil: 44 header.MsgType = "#commit" 45 obj = evt.RepoCommit 46 case evt.RepoIdentity != nil: 47 header.MsgType = "#identity" 48 obj = evt.RepoIdentity 49 case evt.RepoAccount != nil: 50 header.MsgType = "#account" 51 obj = evt.RepoAccount 52 case evt.RepoInfo != nil: 53 header.MsgType = "#info" 54 obj = evt.RepoInfo 55 default: 56 logger.Warn("unrecognized event kind") 57 return 58 } 59 60 buf := new(bytes.Buffer) 61 62 if err := header.MarshalCBOR(buf); err != nil { 63 logger.Error("failed to marshal header to buffer", "err", err) 64 return 65 } 66 67 if err := obj.MarshalCBOR(buf); err != nil { 68 logger.Error("failed to marshal event to buffer", "err", err) 69 return 70 } 71 72 // TODO: use a HTTP client here not the default 73 _, err := http.Post(s.config.SubscribeReposServiceURL, "", buf) 74 if err != nil { 75 logger.Error("posting to web server", "error", err) 76 return 77 } 78 }() 79 } 80 81 // we should tell the relay to request a new crawl at this point if we got disconnected 82 // use a new context since the old one might be cancelled at this point 83 go func() { 84 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 85 defer retryCancel() 86 if err := s.requestCrawl(retryCtx); err != nil { 87 logger.Error("error requesting crawls", "err", err) 88 } 89 }() 90 91 return nil 92}