bluesky appview implementation using microcosm and other services server.reddwarf.app
appview bluesky reddwarf microcosm

microcosm api client

Changed files
+590
microcosm
constellation
slingshot
spacedust
+14
main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "fmt" 5 6 "log" 6 7 "net/http" 7 8 "os" 8 9 "time" 9 10 11 + "tangled.org/whey.party/rdcs/microcosm/constellation" 12 + "tangled.org/whey.party/rdcs/microcosm/slingshot" 10 13 "tangled.org/whey.party/rdcs/sticket" 14 + 11 15 // "github.com/bluesky-social/indigo/atproto/atclient" 12 16 // comatproto "github.com/bluesky-social/indigo/api/atproto" 13 17 // appbsky "github.com/bluesky-social/indigo/api/bsky" 14 18 // "github.com/bluesky-social/indigo/atproto/atclient" 15 19 // "github.com/bluesky-social/indigo/atproto/identity" 16 20 // "github.com/bluesky-social/indigo/atproto/syntax" 21 + "github.com/bluesky-social/indigo/api/agnostic" 17 22 // "github.com/bluesky-social/jetstream/pkg/models" 18 23 ) 19 24 ··· 27 32 func main() { 28 33 fmt.Fprintf(os.Stdout, "RDCS started") 29 34 35 + ctx := context.Background() 30 36 mailbox := sticket.New() 37 + sl := slingshot.NewSlingshot(SLINGSHOT_URL) 38 + cs := constellation.NewConstellation(CONSTELLATION_URL) 39 + // spacedust is type definitions only 40 + // jetstream types is probably available from jetstream/pkg/models 41 + 42 + responsewow, _ := agnostic.RepoGetRecord(ctx, sl, "", "app.bsky.feed.profile", "did:plc:44ybard66vv44zksje25o7dz", "self") 43 + 44 + fmt.Fprintf(os.Stdout, responsewow.Uri) 31 45 32 46 http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { 33 47 mailbox.HandleWS(&w, r)
+224
microcosm/constellation/constellation.go
··· 1 + package constellation 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/bluesky-social/indigo/lex/util" 7 + "tangled.org/whey.party/rdcs/microcosm" 8 + ) 9 + 10 + func NewConstellation(host string) *microcosm.MicrocosmClient { 11 + return microcosm.NewMicrocosm(host) 12 + } 13 + 14 + type ConstellationRecord struct { 15 + Did string `json:"did" cborgen:"did"` 16 + Collection string `json:"collection" cborgen:"collection"` 17 + Rkey string `json:"rkey" cborgen:"rkey"` 18 + } 19 + 20 + type GetBacklinks_Output struct { 21 + Total int `json:"total" cborgen:"total"` 22 + Records []ConstellationRecord `json:"records" cborgen:"records"` 23 + Cursor *string `json:"cursor" cborgen:"cursor"` 24 + } 25 + 26 + func GetBacklinks(ctx context.Context, c util.LexClient, subject string, source string, did []string, limit *int, cursor *string) (*GetBacklinks_Output, error) { 27 + var out GetBacklinks_Output 28 + 29 + params := map[string]interface{}{ 30 + "subject": subject, 31 + "source": source, 32 + } 33 + 34 + if len(did) > 0 { 35 + params["did"] = did 36 + } 37 + 38 + if limit != nil { 39 + params["limit"] = *limit 40 + } 41 + 42 + if cursor != nil { 43 + params["cursor"] = *cursor 44 + } 45 + 46 + if err := c.LexDo(ctx, util.Query, "", "blue.microcosm.links.getBacklinks", params, nil, &out); err != nil { 47 + return nil, err 48 + } 49 + 50 + return &out, nil 51 + } 52 + 53 + type TooManyItem struct { 54 + Subject string `json:"subject" cborgen:"subject"` 55 + Total int `json:"total" cborgen:"total"` 56 + Distinct int `json:"distinct" cborgen:"distinct"` 57 + } 58 + type GetTooManyCounts_Output struct { 59 + CountsByOtherSubject []TooManyItem `json:"counts_by_other_subject" cborgen:"counts_by_other_subject"` 60 + Cursor *string `json:"cursor" cborgen:"cursor"` 61 + } 62 + 63 + func GetTooManyCounts(ctx context.Context, c util.LexClient, subject string, source string, pathToOther string, did []string, otherSubject []string, limit *int, cursor *string) (*GetTooManyCounts_Output, error) { 64 + var out GetTooManyCounts_Output 65 + 66 + params := map[string]interface{}{ 67 + "subject": subject, 68 + "source": source, 69 + "pathToOther": pathToOther, 70 + } 71 + 72 + if len(did) > 0 { 73 + params["did"] = did 74 + } 75 + 76 + if len(otherSubject) > 0 { 77 + params["otherSubject"] = otherSubject 78 + } 79 + 80 + if limit != nil { 81 + params["limit"] = *limit 82 + } 83 + 84 + if cursor != nil { 85 + params["cursor"] = *cursor 86 + } 87 + 88 + if err := c.LexDo(ctx, util.Query, "", "blue.microcosm.links.getManyToManyCounts", params, nil, &out); err != nil { 89 + return nil, err 90 + } 91 + 92 + return &out, nil 93 + } 94 + 95 + // todo: implement the non-xrpc routes. 96 + // prerequisites: a bespoke new api request system that does not use LexDo 97 + 98 + type LegacyLinks_Output struct { 99 + Total int `json:"total" cborgen:"total"` 100 + LinkingRecords []ConstellationRecord `json:"linking_records" cborgen:"linking_records"` 101 + Cursor *string `json:"cursor" cborgen:"cursor"` 102 + } 103 + 104 + // Deprecated: use GetBackLinks 105 + func LegacyLinks(ctx context.Context, c *microcosm.MicrocosmClient, target string, collection string, path string, did []string, limit *int, cursor *string) (*LegacyLinks_Output, error) { 106 + var out LegacyLinks_Output 107 + 108 + params := map[string]interface{}{ 109 + "target": target, 110 + "collection": collection, 111 + "path": path, 112 + } 113 + 114 + if len(did) > 0 { 115 + params["did"] = did 116 + } 117 + 118 + if limit != nil { 119 + params["limit"] = *limit 120 + } 121 + 122 + if cursor != nil { 123 + params["cursor"] = *cursor 124 + } 125 + 126 + if err := c.PathDo(ctx, util.Query, "", "/links", params, nil, &out); err != nil { 127 + return nil, err 128 + } 129 + 130 + return &out, nil 131 + } 132 + 133 + type LegacyLinksDistinctDids_Output struct { 134 + Total int `json:"total" cborgen:"total"` 135 + LinkingRecords []string `json:"linking_records" cborgen:"linking_records"` 136 + Cursor *string `json:"cursor" cborgen:"cursor"` 137 + } 138 + 139 + func LegacyLinksDistinctDids(ctx context.Context, c *microcosm.MicrocosmClient, target string, collection string, path string, cursor *string) (*LegacyLinksDistinctDids_Output, error) { 140 + var out LegacyLinksDistinctDids_Output 141 + 142 + params := map[string]interface{}{ 143 + "target": target, 144 + "collection": collection, 145 + "path": path, 146 + } 147 + 148 + if cursor != nil { 149 + params["cursor"] = *cursor 150 + } 151 + 152 + if err := c.PathDo(ctx, util.Query, "", "/links/distinct-dids", params, nil, &out); err != nil { 153 + return nil, err 154 + } 155 + 156 + return &out, nil 157 + } 158 + 159 + type ConstellationCount_Output struct { 160 + Total int `json:"total" cborgen:"total"` 161 + } 162 + 163 + func LegacyLinksCount(ctx context.Context, c *microcosm.MicrocosmClient, target string, collection string, path string, cursor *string) (*ConstellationCount_Output, error) { 164 + var out ConstellationCount_Output 165 + 166 + params := map[string]interface{}{ 167 + "target": target, 168 + "collection": collection, 169 + "path": path, 170 + } 171 + 172 + if cursor != nil { 173 + params["cursor"] = *cursor 174 + } 175 + 176 + if err := c.PathDo(ctx, util.Query, "", "/links/count", params, nil, &out); err != nil { 177 + return nil, err 178 + } 179 + 180 + return &out, nil 181 + } 182 + 183 + func LegacyLinksCountDistinctDids(ctx context.Context, c *microcosm.MicrocosmClient, target string, collection string, path string, cursor *string) (*ConstellationCount_Output, error) { 184 + var out ConstellationCount_Output 185 + 186 + params := map[string]interface{}{ 187 + "target": target, 188 + "collection": collection, 189 + "path": path, 190 + } 191 + 192 + if cursor != nil { 193 + params["cursor"] = *cursor 194 + } 195 + 196 + if err := c.PathDo(ctx, util.Query, "", "/links/count/distinct-dids", params, nil, &out); err != nil { 197 + return nil, err 198 + } 199 + 200 + return &out, nil 201 + } 202 + 203 + type LinkStats struct { 204 + Records int `json:"records"` 205 + DistinctDIDs int `json:"distinct_dids"` 206 + } 207 + 208 + type ConstellationAll_Output struct { 209 + Links map[string]map[string]LinkStats `json:"links"` 210 + } 211 + 212 + func LegacyLinksAll(ctx context.Context, c *microcosm.MicrocosmClient, target string) (*ConstellationAll_Output, error) { 213 + var out ConstellationAll_Output 214 + 215 + params := map[string]interface{}{ 216 + "target": target, 217 + } 218 + 219 + if err := c.PathDo(ctx, util.Query, "", "/links/all", params, nil, &out); err != nil { 220 + return nil, err 221 + } 222 + 223 + return &out, nil 224 + }
+277
microcosm/microcosm.go
··· 1 + package microcosm 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "net/url" 11 + "strings" 12 + 13 + atpclient "github.com/bluesky-social/indigo/atproto/atclient" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + ) 16 + 17 + type MicrocosmClient struct { 18 + // Inner HTTP client. May be customized after the overall [MicrocosmClient] struct is created; for example to set a default request timeout. 19 + Client *http.Client 20 + 21 + // Host URL prefix: scheme, hostname, and port. This field is required. 22 + Host string 23 + 24 + // Optional HTTP headers which will be included in all requests. Only a single value per key is included; request-level headers will override any client-level defaults. 25 + Headers http.Header 26 + } 27 + 28 + func NewMicrocosm(host string) *MicrocosmClient { 29 + // validate cfg if needed 30 + return &MicrocosmClient{ 31 + Client: http.DefaultClient, 32 + Host: host, 33 + Headers: map[string][]string{ 34 + "User-Agent": []string{"microcosm-rdcs"}, 35 + }, 36 + } 37 + } 38 + 39 + func (c *MicrocosmClient) LexDo(ctx context.Context, method string, inputEncoding string, endpoint string, params map[string]any, bodyData any, out any) error { 40 + // some of the code here is copied from indigo:xrpc/xrpc.go 41 + 42 + nsid, err := syntax.ParseNSID(endpoint) 43 + if err != nil { 44 + return err 45 + } 46 + 47 + var body io.Reader 48 + if bodyData != nil { 49 + if rr, ok := bodyData.(io.Reader); ok { 50 + body = rr 51 + } else { 52 + b, err := json.Marshal(bodyData) 53 + if err != nil { 54 + return err 55 + } 56 + 57 + body = bytes.NewReader(b) 58 + if inputEncoding == "" { 59 + inputEncoding = "application/json" 60 + } 61 + } 62 + } 63 + 64 + req := NewAPIRequest(method, nsid, body) 65 + 66 + if inputEncoding != "" { 67 + req.Headers.Set("Content-Type", inputEncoding) 68 + } 69 + 70 + if params != nil { 71 + qp, err := atpclient.ParseParams(params) 72 + if err != nil { 73 + return err 74 + } 75 + req.QueryParams = qp 76 + } 77 + 78 + resp, err := c.Do(ctx, req) 79 + if err != nil { 80 + return err 81 + } 82 + defer resp.Body.Close() 83 + 84 + if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { 85 + var eb atpclient.ErrorBody 86 + if err := json.NewDecoder(resp.Body).Decode(&eb); err != nil { 87 + return &atpclient.APIError{StatusCode: resp.StatusCode} 88 + } 89 + return eb.APIError(resp.StatusCode) 90 + } 91 + 92 + if out == nil { 93 + // drain body before returning 94 + io.ReadAll(resp.Body) 95 + return nil 96 + } 97 + 98 + if buf, ok := out.(*bytes.Buffer); ok { 99 + if resp.ContentLength < 0 { 100 + _, err := io.Copy(buf, resp.Body) 101 + if err != nil { 102 + return fmt.Errorf("reading response body: %w", err) 103 + } 104 + } else { 105 + n, err := io.CopyN(buf, resp.Body, resp.ContentLength) 106 + if err != nil { 107 + return fmt.Errorf("reading length delimited response body (%d < %d): %w", n, resp.ContentLength, err) 108 + } 109 + } 110 + } else { 111 + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { 112 + return fmt.Errorf("failed decoding JSON response body: %w", err) 113 + } 114 + } 115 + 116 + return nil 117 + } 118 + 119 + func (c *MicrocosmClient) Do(ctx context.Context, req *atpclient.APIRequest) (*http.Response, error) { 120 + 121 + if c.Client == nil { 122 + c.Client = http.DefaultClient 123 + } 124 + 125 + httpReq, err := req.HTTPRequest(ctx, c.Host, c.Headers) 126 + if err != nil { 127 + return nil, err 128 + } 129 + 130 + var resp *http.Response 131 + resp, err = c.Client.Do(httpReq) 132 + if err != nil { 133 + return nil, err 134 + } 135 + return resp, nil 136 + } 137 + 138 + func NewAPIRequest(method string, endpoint syntax.NSID, body io.Reader) *atpclient.APIRequest { 139 + req := atpclient.APIRequest{ 140 + Method: method, 141 + Endpoint: endpoint, 142 + Headers: map[string][]string{}, 143 + QueryParams: map[string][]string{}, 144 + } 145 + 146 + // logic to turn "whatever io.Reader we are handed" in to something relatively re-tryable (using GetBody) 147 + if body != nil { 148 + // NOTE: http.NewRequestWithContext already handles GetBody() as well as ContentLength for specific types like bytes.Buffer and strings.Reader. We just want to add io.Seeker here, for things like files-on-disk. 149 + switch v := body.(type) { 150 + case io.Seeker: 151 + req.Body = io.NopCloser(body) 152 + req.GetBody = func() (io.ReadCloser, error) { 153 + v.Seek(0, 0) 154 + return io.NopCloser(body), nil 155 + } 156 + default: 157 + req.Body = body 158 + } 159 + } 160 + return &req 161 + } 162 + 163 + // PathDo performs an HTTP request to an arbitrary path (no /xrpc/ prefix enforcement). 164 + func (c *MicrocosmClient) PathDo(ctx context.Context, method string, inputEncoding string, path string, params map[string]any, bodyData any, out any) error { 165 + // 1. Prepare the request body 166 + var body io.Reader 167 + if bodyData != nil { 168 + if rr, ok := bodyData.(io.Reader); ok { 169 + body = rr 170 + } else { 171 + b, err := json.Marshal(bodyData) 172 + if err != nil { 173 + return err 174 + } 175 + body = bytes.NewReader(b) 176 + if inputEncoding == "" { 177 + inputEncoding = "application/json" 178 + } 179 + } 180 + } 181 + 182 + // 2. Construct the full URL 183 + // We handle slash joining carefully to avoid double slashes or missing slashes 184 + baseURL := strings.TrimRight(c.Host, "/") 185 + cleanPath := strings.TrimLeft(path, "/") 186 + fullURLStr := fmt.Sprintf("%s/%s", baseURL, cleanPath) 187 + 188 + // 3. Prepare Query Parameters 189 + // We reuse the indigo parser for convenience, but we must apply them manually to the URL 190 + var queryParams url.Values 191 + if params != nil { 192 + qp, err := atpclient.ParseParams(params) 193 + if err != nil { 194 + return err 195 + } 196 + queryParams = make(url.Values) 197 + for k, v := range qp { 198 + for _, val := range v { 199 + queryParams.Add(k, val) 200 + } 201 + } 202 + } 203 + 204 + if len(queryParams) > 0 { 205 + // Verify the URL parses correctly so we can attach params 206 + u, err := url.Parse(fullURLStr) 207 + if err != nil { 208 + return fmt.Errorf("invalid url: %w", err) 209 + } 210 + u.RawQuery = queryParams.Encode() 211 + fullURLStr = u.String() 212 + } 213 + 214 + // 4. Create the Standard HTTP Request 215 + req, err := http.NewRequestWithContext(ctx, method, fullURLStr, body) 216 + if err != nil { 217 + return err 218 + } 219 + 220 + // 5. Apply Headers 221 + // Apply default client headers 222 + for k, v := range c.Headers { 223 + for _, val := range v { 224 + req.Header.Add(k, val) 225 + } 226 + } 227 + // Apply specific content type if set 228 + if inputEncoding != "" { 229 + req.Header.Set("Content-Type", inputEncoding) 230 + } 231 + 232 + // 6. Execute Request 233 + // Note: We access c.Client directly because c.Do() expects an XRPC-specific APIRequest 234 + if c.Client == nil { 235 + c.Client = http.DefaultClient 236 + } 237 + resp, err := c.Client.Do(req) 238 + if err != nil { 239 + return err 240 + } 241 + defer resp.Body.Close() 242 + 243 + // 7. Handle Response (Mirrors LexDo logic) 244 + if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { 245 + // Try to decode generic error body, fallback to status code error 246 + var eb atpclient.ErrorBody 247 + if err := json.NewDecoder(resp.Body).Decode(&eb); err != nil { 248 + return fmt.Errorf("http request failed: %s", resp.Status) 249 + } 250 + return eb.APIError(resp.StatusCode) 251 + } 252 + 253 + if out == nil { 254 + io.ReadAll(resp.Body) 255 + return nil 256 + } 257 + 258 + if buf, ok := out.(*bytes.Buffer); ok { 259 + if resp.ContentLength < 0 { 260 + _, err := io.Copy(buf, resp.Body) 261 + if err != nil { 262 + return fmt.Errorf("reading response body: %w", err) 263 + } 264 + } else { 265 + n, err := io.CopyN(buf, resp.Body, resp.ContentLength) 266 + if err != nil { 267 + return fmt.Errorf("reading length delimited response body (%d < %d): %w", n, resp.ContentLength, err) 268 + } 269 + } 270 + } else { 271 + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { 272 + return fmt.Errorf("failed decoding JSON response body: %w", err) 273 + } 274 + } 275 + 276 + return nil 277 + }
+53
microcosm/slingshot/slingshot.go
··· 1 + package slingshot 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/bluesky-social/indigo/api/agnostic" 7 + "github.com/bluesky-social/indigo/lex/util" 8 + "tangled.org/whey.party/rdcs/microcosm" 9 + ) 10 + 11 + func NewSlingshot(host string) *microcosm.MicrocosmClient { 12 + return microcosm.NewMicrocosm(host) 13 + } 14 + 15 + type ResolveMiniDoc_Output struct { 16 + Did string `json:"did" cborgen:"did"` 17 + Handle string `json:"handle" cborgen:"handle"` 18 + Pds string `json:"pds" cborgen:"pds"` 19 + SigningKey string `json:"signing_key" cborgen:"signing_key"` 20 + } 21 + 22 + // get record also works here, but you should import it from indigo/api/agnostic (RepoGetRecord) 23 + 24 + func RepoGetUriRecord(ctx context.Context, c util.LexClient, cid string, at_uri string) (*agnostic.RepoGetRecord_Output, error) { 25 + var out agnostic.RepoGetRecord_Output 26 + 27 + params := map[string]interface{}{ 28 + "at_uri": at_uri, 29 + } 30 + if cid != "" { 31 + params["cid"] = cid 32 + } 33 + if err := c.LexDo(ctx, util.Query, "", "com.bad-example.repo.getUriRecord", params, nil, &out); err != nil { 34 + return nil, err 35 + } 36 + 37 + return &out, nil 38 + } 39 + 40 + // resolve handle also works here, but you should import it from indigo/atproto/identity (ResolveHandle) 41 + 42 + func ResolveMiniDoc(ctx context.Context, c util.LexClient, repo string) (*ResolveMiniDoc_Output, error) { 43 + var out ResolveMiniDoc_Output 44 + 45 + params := map[string]interface{}{ 46 + "identifier": repo, 47 + } 48 + if err := c.LexDo(ctx, util.Query, "", "com.bad-example.identity.resolveMiniDoc", params, nil, &out); err != nil { 49 + return nil, err 50 + } 51 + 52 + return &out, nil 53 + }
+22
microcosm/spacedust/spacedust.go
··· 1 + package spacedust 2 + 3 + type Spacedust_Event struct { 4 + Kind string `json:"kind" cborgen:"kind,const=link"` 5 + Origin string `json:"origin" cborgen:"origin,const=live|replay|backfill"` 6 + Link Spacedust_Link `json:"link" cborgen:"link"` 7 + } 8 + 9 + type Spacedust_Link struct { 10 + Operation string `json:"operation" cborgen:"operation,const=create|delete"` 11 + Source string `json:"source" cborgen:"source"` 12 + SourceRecord string `json:"source_record" cborgen:"source_record"` 13 + SourceRev string `json:"source_rev" cborgen:"source_rev"` 14 + Subject string `json:"subject" cborgen:"subject"` 15 + } 16 + 17 + type Spacedust_Query struct { 18 + WantedSources []string `json:"wantedSources" cborgen:"wantedSources"` 19 + WantedSubjectDids []string `json:"wantedSubjectDids" cborgen:"wantedSubjectDids"` 20 + WantedSubjects []string `json:"wantedSubjects" cborgen:"wantedSubjects"` 21 + Instant bool `json:"instant" cborgen:"instant"` 22 + }