1package splitter
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net/http"
8 "net/url"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/xrpc"
12
13 "github.com/labstack/echo/v4"
14)
15
16type HealthStatus struct {
17 Service string `json:"service,const=rainbow"`
18 Status string `json:"status"`
19 Message string `json:"msg,omitempty"`
20}
21
22func (s *Splitter) HandleHealthCheck(c echo.Context) error {
23 return c.JSON(http.StatusOK, HealthStatus{Status: "ok"})
24}
25
26var homeMessage string = `
27 _ _
28 _ _ __ _(_)_ _ | |__ _____ __ __
29| '_/ _' | | ' \| '_ \/ _ \ V V /
30|_| \__,_|_|_||_|_.__/\___/\_/\_/
31
32This is an atproto [https://atproto.com] firehose fanout service, running the 'rainbow' codebase [https://github.com/bluesky-social/indigo]
33
34The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos
35`
36
37func (s *Splitter) HandleHomeMessage(c echo.Context) error {
38 return c.String(http.StatusOK, homeMessage)
39}
40
41func (s *Splitter) HandleComAtprotoSyncRequestCrawl(c echo.Context) error {
42 ctx := c.Request().Context()
43 var body comatproto.SyncRequestCrawl_Input
44 if err := c.Bind(&body); err != nil {
45 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("invalid body: %s", err)})
46 }
47 if body.Hostname == "" {
48 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "must include a hostname"})
49 }
50
51 // first forward to the upstream
52 xrpcc := xrpc.Client{
53 Client: s.upstreamClient,
54 Host: s.conf.UpstreamHostHTTP(),
55 UserAgent: &s.conf.UserAgent,
56 }
57
58 err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body)
59 if err != nil {
60 httpError, ok := err.(*xrpc.Error)
61 if ok {
62 return c.JSON(httpError.StatusCode, xrpc.XRPCError{ErrStr: "UpstreamError", Message: fmt.Sprintf("%s", httpError.Wrapped)})
63 }
64 return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "ProxyRequestFailed", Message: fmt.Sprintf("failed forwarding request: %s", err)})
65 }
66
67 // if that was successful, then forward on to the other upstreams (in goroutines)
68 for _, nc := range s.nextCrawlers {
69 // intentional local copy of loop variable
70 crawler := nc.String()
71 go func() {
72 // new context to outlive original HTTP request
73 ctx := context.Background()
74 xrpcc := xrpc.Client{
75 Client: s.peerClient,
76 Host: crawler,
77 }
78 if err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body); err != nil {
79 s.logger.Warn("failed to forward requestCrawl", "crawler", crawler, "targetHost", body.Hostname, "err", err)
80 } else {
81 s.logger.Info("successfully forwarded requestCrawl", "crawler", crawler, "targetHost", body.Hostname)
82 }
83 }()
84 }
85
86 return c.JSON(http.StatusOK, map[string]any{"success": true})
87}
88
89// Proxies a request to the single upstream (relay)
90func (s *Splitter) ProxyRequestUpstream(c echo.Context) error {
91 u, err := url.Parse(s.conf.UpstreamHostHTTP())
92 if err != nil {
93 return err
94 }
95 return s.ProxyRequest(c, u.Host, u.Scheme)
96}
97
98// Proxies a request to the collectiondir
99func (s *Splitter) ProxyRequestCollectionDir(c echo.Context) error {
100 u, err := url.Parse(s.conf.CollectionDirHost)
101 if err != nil {
102 return err
103 }
104 return s.ProxyRequest(c, u.Host, u.Scheme)
105}
106
107func (s *Splitter) ProxyRequest(c echo.Context, hostname, scheme string) error {
108
109 req := c.Request()
110 respWriter := c.Response()
111
112 u := req.URL
113 u.Scheme = scheme
114 u.Host = hostname
115 upstreamReq, err := http.NewRequest(req.Method, u.String(), req.Body)
116 if err != nil {
117 s.logger.Warn("proxy request failed", "err", err)
118 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"})
119 }
120
121 // copy subset of request headers
122 for _, hdr := range []string{"Accept", "User-Agent", "Authorization", "Via", "Content-Type", "Content-Length"} {
123 val := req.Header.Get(hdr)
124 if val != "" {
125 upstreamReq.Header.Set(hdr, val)
126 }
127 }
128
129 upstreamResp, err := s.upstreamClient.Do(upstreamReq)
130 if err != nil {
131 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"})
132 }
133 defer upstreamResp.Body.Close()
134
135 // copy a subset of response headers
136 for _, hdr := range []string{"Content-Type", "Content-Length", "Location"} {
137 val := upstreamResp.Header.Get(hdr)
138 if val != "" {
139 respWriter.Header().Set(hdr, val)
140 }
141 }
142 respWriter.WriteHeader(upstreamResp.StatusCode)
143
144 _, err = io.Copy(respWriter, upstreamResp.Body)
145 if err != nil {
146 s.logger.Error("error copying proxy body", "err", err)
147 }
148
149 return nil
150}