forked from
hailey.at/cocoon
An atproto PDS written in Go
1package server
2
3import (
4 "bytes"
5 "context"
6 "net/http"
7 "time"
8
9 "github.com/bluesky-social/indigo/events"
10 "github.com/bluesky-social/indigo/lex/util"
11)
12
13func (s *Server) emmitEvents(ctx context.Context) error {
14 ctx, cancel := context.WithCancel(ctx)
15 defer cancel()
16
17 logger := s.logger.With("component", "event-emmiter")
18 ident := "self"
19 var since *int64
20 // TODO: track since
21
22 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
23 return true
24 }, since)
25 if err != nil {
26 return err
27 }
28 defer evtManCancel()
29
30 header := events.EventHeader{Op: events.EvtKindMessage}
31 for evt := range evts {
32 func() {
33 if ctx.Err() != nil {
34 logger.Error("context error", "err", err)
35 return
36 }
37
38 var obj util.CBOR
39 switch {
40 case evt.Error != nil:
41 header.Op = events.EvtKindErrorFrame
42 obj = evt.Error
43 case evt.RepoCommit != nil:
44 header.MsgType = "#commit"
45 obj = evt.RepoCommit
46 case evt.RepoIdentity != nil:
47 header.MsgType = "#identity"
48 obj = evt.RepoIdentity
49 case evt.RepoAccount != nil:
50 header.MsgType = "#account"
51 obj = evt.RepoAccount
52 case evt.RepoInfo != nil:
53 header.MsgType = "#info"
54 obj = evt.RepoInfo
55 default:
56 logger.Warn("unrecognized event kind")
57 return
58 }
59
60 buf := new(bytes.Buffer)
61
62 if err := header.MarshalCBOR(buf); err != nil {
63 logger.Error("failed to marshal header to buffer", "err", err)
64 return
65 }
66
67 if err := obj.MarshalCBOR(buf); err != nil {
68 logger.Error("failed to marshal event to buffer", "err", err)
69 return
70 }
71
72 // TODO: use a HTTP client here not the default
73 _, err := http.Post(s.config.SubscribeReposServiceURL, "", buf)
74 if err != nil {
75 logger.Error("posting to web server", "error", err)
76 return
77 }
78 }()
79 }
80
81 // we should tell the relay to request a new crawl at this point if we got disconnected
82 // use a new context since the old one might be cancelled at this point
83 go func() {
84 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
85 defer retryCancel()
86 if err := s.requestCrawl(retryCtx); err != nil {
87 logger.Error("error requesting crawls", "err", err)
88 }
89 }()
90
91 return nil
92}