bluesky appview implementation using microcosm and other services server.reddwarf.app
appview bluesky reddwarf microcosm
at main 7.1 kB view raw
1package microcosm 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "net/http" 10 "net/url" 11 "strings" 12 13 atpclient "github.com/bluesky-social/indigo/atproto/atclient" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15) 16 17type MicrocosmClient struct { 18 // Inner HTTP client. May be customized after the overall [MicrocosmClient] struct is created; for example to set a default request timeout. 19 Client *http.Client 20 21 // Host URL prefix: scheme, hostname, and port. This field is required. 22 Host string 23 24 // Optional HTTP headers which will be included in all requests. Only a single value per key is included; request-level headers will override any client-level defaults. 25 Headers http.Header 26} 27 28func NewMicrocosm(host string) *MicrocosmClient { 29 // validate cfg if needed 30 return &MicrocosmClient{ 31 Client: http.DefaultClient, 32 Host: host, 33 Headers: map[string][]string{ 34 "User-Agent": []string{"microcosm-red-dwarf-server"}, 35 }, 36 } 37} 38 39func (c *MicrocosmClient) LexDo(ctx context.Context, method string, inputEncoding string, endpoint string, params map[string]any, bodyData any, out any) error { 40 // some of the code here is copied from indigo:xrpc/xrpc.go 41 42 nsid, err := syntax.ParseNSID(endpoint) 43 if err != nil { 44 return err 45 } 46 47 var body io.Reader 48 if bodyData != nil { 49 if rr, ok := bodyData.(io.Reader); ok { 50 body = rr 51 } else { 52 b, err := json.Marshal(bodyData) 53 if err != nil { 54 return err 55 } 56 57 body = bytes.NewReader(b) 58 if inputEncoding == "" { 59 inputEncoding = "application/json" 60 } 61 } 62 } 63 64 req := NewAPIRequest(method, nsid, body) 65 66 if inputEncoding != "" { 67 req.Headers.Set("Content-Type", inputEncoding) 68 } 69 70 if params != nil { 71 qp, err := atpclient.ParseParams(params) 72 if err != nil { 73 return err 74 } 75 req.QueryParams = qp 76 } 77 78 resp, err := c.Do(ctx, req) 79 if err != nil { 80 return err 81 } 82 defer resp.Body.Close() 83 84 if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { 85 var eb atpclient.ErrorBody 86 if err := json.NewDecoder(resp.Body).Decode(&eb); err != nil { 87 return &atpclient.APIError{StatusCode: resp.StatusCode} 88 } 89 return eb.APIError(resp.StatusCode) 90 } 91 92 if out == nil { 93 // drain body before returning 94 io.ReadAll(resp.Body) 95 return nil 96 } 97 98 if buf, ok := out.(*bytes.Buffer); ok { 99 if resp.ContentLength < 0 { 100 _, err := io.Copy(buf, resp.Body) 101 if err != nil { 102 return fmt.Errorf("reading response body: %w", err) 103 } 104 } else { 105 n, err := io.CopyN(buf, resp.Body, resp.ContentLength) 106 if err != nil { 107 return fmt.Errorf("reading length delimited response body (%d < %d): %w", n, resp.ContentLength, err) 108 } 109 } 110 } else { 111 if err := json.NewDecoder(resp.Body).Decode(out); err != nil { 112 return fmt.Errorf("failed decoding JSON response body: %w", err) 113 } 114 } 115 116 return nil 117} 118 119func (c *MicrocosmClient) Do(ctx context.Context, req *atpclient.APIRequest) (*http.Response, error) { 120 121 if c.Client == nil { 122 c.Client = http.DefaultClient 123 } 124 125 httpReq, err := req.HTTPRequest(ctx, c.Host, c.Headers) 126 if err != nil { 127 return nil, err 128 } 129 130 var resp *http.Response 131 resp, err = c.Client.Do(httpReq) 132 if err != nil { 133 return nil, err 134 } 135 return resp, nil 136} 137 138func NewAPIRequest(method string, endpoint syntax.NSID, body io.Reader) *atpclient.APIRequest { 139 req := atpclient.APIRequest{ 140 Method: method, 141 Endpoint: endpoint, 142 Headers: map[string][]string{}, 143 QueryParams: map[string][]string{}, 144 } 145 146 // logic to turn "whatever io.Reader we are handed" in to something relatively re-tryable (using GetBody) 147 if body != nil { 148 // NOTE: http.NewRequestWithContext already handles GetBody() as well as ContentLength for specific types like bytes.Buffer and strings.Reader. We just want to add io.Seeker here, for things like files-on-disk. 149 switch v := body.(type) { 150 case io.Seeker: 151 req.Body = io.NopCloser(body) 152 req.GetBody = func() (io.ReadCloser, error) { 153 v.Seek(0, 0) 154 return io.NopCloser(body), nil 155 } 156 default: 157 req.Body = body 158 } 159 } 160 return &req 161} 162 163// PathDo performs an HTTP request to an arbitrary path (no /xrpc/ prefix enforcement). 164func (c *MicrocosmClient) PathDo(ctx context.Context, method string, inputEncoding string, path string, params map[string]any, bodyData any, out any) error { 165 // 1. Prepare the request body 166 var body io.Reader 167 if bodyData != nil { 168 if rr, ok := bodyData.(io.Reader); ok { 169 body = rr 170 } else { 171 b, err := json.Marshal(bodyData) 172 if err != nil { 173 return err 174 } 175 body = bytes.NewReader(b) 176 if inputEncoding == "" { 177 inputEncoding = "application/json" 178 } 179 } 180 } 181 182 // 2. Construct the full URL 183 // We handle slash joining carefully to avoid double slashes or missing slashes 184 baseURL := strings.TrimRight(c.Host, "/") 185 cleanPath := strings.TrimLeft(path, "/") 186 fullURLStr := fmt.Sprintf("%s/%s", baseURL, cleanPath) 187 188 // 3. Prepare Query Parameters 189 // We reuse the indigo parser for convenience, but we must apply them manually to the URL 190 var queryParams url.Values 191 if params != nil { 192 qp, err := atpclient.ParseParams(params) 193 if err != nil { 194 return err 195 } 196 queryParams = make(url.Values) 197 for k, v := range qp { 198 for _, val := range v { 199 queryParams.Add(k, val) 200 } 201 } 202 } 203 204 if len(queryParams) > 0 { 205 // Verify the URL parses correctly so we can attach params 206 u, err := url.Parse(fullURLStr) 207 if err != nil { 208 return fmt.Errorf("invalid url: %w", err) 209 } 210 u.RawQuery = queryParams.Encode() 211 fullURLStr = u.String() 212 } 213 214 // 4. Create the Standard HTTP Request 215 req, err := http.NewRequestWithContext(ctx, method, fullURLStr, body) 216 if err != nil { 217 return err 218 } 219 220 // 5. Apply Headers 221 // Apply default client headers 222 for k, v := range c.Headers { 223 for _, val := range v { 224 req.Header.Add(k, val) 225 } 226 } 227 // Apply specific content type if set 228 if inputEncoding != "" { 229 req.Header.Set("Content-Type", inputEncoding) 230 } 231 232 // 6. Execute Request 233 // Note: We access c.Client directly because c.Do() expects an XRPC-specific APIRequest 234 if c.Client == nil { 235 c.Client = http.DefaultClient 236 } 237 resp, err := c.Client.Do(req) 238 if err != nil { 239 return err 240 } 241 defer resp.Body.Close() 242 243 // 7. Handle Response (Mirrors LexDo logic) 244 if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { 245 // Try to decode generic error body, fallback to status code error 246 var eb atpclient.ErrorBody 247 if err := json.NewDecoder(resp.Body).Decode(&eb); err != nil { 248 return fmt.Errorf("http request failed: %s", resp.Status) 249 } 250 return eb.APIError(resp.StatusCode) 251 } 252 253 if out == nil { 254 io.ReadAll(resp.Body) 255 return nil 256 } 257 258 if buf, ok := out.(*bytes.Buffer); ok { 259 if resp.ContentLength < 0 { 260 _, err := io.Copy(buf, resp.Body) 261 if err != nil { 262 return fmt.Errorf("reading response body: %w", err) 263 } 264 } else { 265 n, err := io.CopyN(buf, resp.Body, resp.ContentLength) 266 if err != nil { 267 return fmt.Errorf("reading length delimited response body (%d < %d): %w", n, resp.ContentLength, err) 268 } 269 } 270 } else { 271 if err := json.NewDecoder(resp.Body).Decode(out); err != nil { 272 return fmt.Errorf("failed decoding JSON response body: %w", err) 273 } 274 } 275 276 return nil 277}