porting all github actions from bluesky-social/indigo to tangled CI
at main 4.5 kB view raw
1package splitter 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "net/http" 8 "net/url" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/xrpc" 12 13 "github.com/labstack/echo/v4" 14) 15 16type HealthStatus struct { 17 Service string `json:"service,const=rainbow"` 18 Status string `json:"status"` 19 Message string `json:"msg,omitempty"` 20} 21 22func (s *Splitter) HandleHealthCheck(c echo.Context) error { 23 return c.JSON(http.StatusOK, HealthStatus{Status: "ok"}) 24} 25 26var homeMessage string = ` 27 _ _ 28 _ _ __ _(_)_ _ | |__ _____ __ __ 29| '_/ _' | | ' \| '_ \/ _ \ V V / 30|_| \__,_|_|_||_|_.__/\___/\_/\_/ 31 32This is an atproto [https://atproto.com] firehose fanout service, running the 'rainbow' codebase [https://github.com/bluesky-social/indigo] 33 34The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 35` 36 37func (s *Splitter) HandleHomeMessage(c echo.Context) error { 38 return c.String(http.StatusOK, homeMessage) 39} 40 41func (s *Splitter) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { 42 ctx := c.Request().Context() 43 var body comatproto.SyncRequestCrawl_Input 44 if err := c.Bind(&body); err != nil { 45 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("invalid body: %s", err)}) 46 } 47 if body.Hostname == "" { 48 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "must include a hostname"}) 49 } 50 51 // first forward to the upstream 52 xrpcc := xrpc.Client{ 53 Client: s.upstreamClient, 54 Host: s.conf.UpstreamHostHTTP(), 55 UserAgent: &s.conf.UserAgent, 56 } 57 58 err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body) 59 if err != nil { 60 httpError, ok := err.(*xrpc.Error) 61 if ok { 62 return c.JSON(httpError.StatusCode, xrpc.XRPCError{ErrStr: "UpstreamError", Message: fmt.Sprintf("%s", httpError.Wrapped)}) 63 } 64 return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "ProxyRequestFailed", Message: fmt.Sprintf("failed forwarding request: %s", err)}) 65 } 66 67 // if that was successful, then forward on to the other upstreams (in goroutines) 68 for _, nc := range s.nextCrawlers { 69 // intentional local copy of loop variable 70 crawler := nc.String() 71 go func() { 72 // new context to outlive original HTTP request 73 ctx := context.Background() 74 xrpcc := xrpc.Client{ 75 Client: s.peerClient, 76 Host: crawler, 77 } 78 if err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body); err != nil { 79 s.logger.Warn("failed to forward requestCrawl", "crawler", crawler, "targetHost", body.Hostname, "err", err) 80 } else { 81 s.logger.Info("successfully forwarded requestCrawl", "crawler", crawler, "targetHost", body.Hostname) 82 } 83 }() 84 } 85 86 return c.JSON(http.StatusOK, map[string]any{"success": true}) 87} 88 89// Proxies a request to the single upstream (relay) 90func (s *Splitter) ProxyRequestUpstream(c echo.Context) error { 91 u, err := url.Parse(s.conf.UpstreamHostHTTP()) 92 if err != nil { 93 return err 94 } 95 return s.ProxyRequest(c, u.Host, u.Scheme) 96} 97 98// Proxies a request to the collectiondir 99func (s *Splitter) ProxyRequestCollectionDir(c echo.Context) error { 100 u, err := url.Parse(s.conf.CollectionDirHost) 101 if err != nil { 102 return err 103 } 104 return s.ProxyRequest(c, u.Host, u.Scheme) 105} 106 107func (s *Splitter) ProxyRequest(c echo.Context, hostname, scheme string) error { 108 109 req := c.Request() 110 respWriter := c.Response() 111 112 u := req.URL 113 u.Scheme = scheme 114 u.Host = hostname 115 upstreamReq, err := http.NewRequest(req.Method, u.String(), req.Body) 116 if err != nil { 117 s.logger.Warn("proxy request failed", "err", err) 118 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"}) 119 } 120 121 // copy subset of request headers 122 for _, hdr := range []string{"Accept", "User-Agent", "Authorization", "Via", "Content-Type", "Content-Length"} { 123 val := req.Header.Get(hdr) 124 if val != "" { 125 upstreamReq.Header.Set(hdr, val) 126 } 127 } 128 129 upstreamResp, err := s.upstreamClient.Do(upstreamReq) 130 if err != nil { 131 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"}) 132 } 133 defer upstreamResp.Body.Close() 134 135 // copy a subset of response headers 136 for _, hdr := range []string{"Content-Type", "Content-Length", "Location"} { 137 val := upstreamResp.Header.Get(hdr) 138 if val != "" { 139 respWriter.Header().Set(hdr, val) 140 } 141 } 142 respWriter.WriteHeader(upstreamResp.StatusCode) 143 144 _, err = io.Copy(respWriter, upstreamResp.Body) 145 if err != nil { 146 s.logger.Error("error copying proxy body", "err", err) 147 } 148 149 return nil 150}