go scratch code for atproto
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'main' into backfill

+459
+17
atproto/heap/cid.go
··· 1 + package heap 2 + 3 + import ( 4 + "github.com/ipfs/go-cid" 5 + "github.com/multiformats/go-multihash" 6 + ) 7 + 8 + func computeCID(b []byte) (*cid.Cid, error) { 9 + // TODO: not sure why this would ever fail; could we ignore or panic? 10 + // TODO: is there a more performant way to call SHA256, then wrap? 11 + builder := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256) 12 + c, err := builder.Sum(b) 13 + if err != nil { 14 + return nil, err 15 + } 16 + return &c, err 17 + }
+108
atproto/heap/examples_test.go
··· 1 + package heap 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + 9 + "github.com/bluesky-social/indigo/atproto/repo" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + ) 12 + 13 + func ExampleNetClient_GetRepoCAR() { 14 + 15 + ctx := context.Background() 16 + nc := NewNetClient() 17 + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") 18 + 19 + stream, err := nc.GetRepoCAR(ctx, did) 20 + if err != nil { 21 + panic("failed to download CAR: " + err.Error()) 22 + } 23 + defer stream.Close() 24 + 25 + // NOTE: could also use LoadCommitFromCAR 26 + commit, _, err := repo.LoadRepoFromCAR(ctx, stream) 27 + if err != nil { 28 + panic("failed to parse CAR: " + err.Error()) 29 + } 30 + 31 + ident, _ := nc.Dir.LookupDID(ctx, did) 32 + pub, _ := ident.PublicKey() 33 + 34 + if err := commit.VerifySignature(pub); err != nil { 35 + panic("failed to verify commit signature: " + err.Error()) 36 + } 37 + 38 + fmt.Println(commit.DID) 39 + // did:plc:ewvi7nxzyoun6zhxrhs64oiz 40 + } 41 + 42 + func ExampleNetClient_GetBlob() { 43 + 44 + ctx := context.Background() 45 + nc := NewNetClient() 46 + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") 47 + cid := syntax.CID("bafkreieya7iitpu4okjtm7iexiwikj7t63ttlthad32ojsvjqhqbc3iwmi") 48 + 49 + buf := bytes.Buffer{} 50 + if err := nc.GetBlob(ctx, did, cid, &buf); err != nil { 51 + panic("failed to download blob: " + err.Error()) 52 + } 53 + 54 + fmt.Println(buf.Len()) 55 + // 518394 56 + } 57 + 58 + func ExampleNetClient_GetAccountStatus() { 59 + 60 + ctx := context.Background() 61 + nc := NewNetClient() 62 + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") 63 + 64 + active, status, err := nc.GetAccountStatus(ctx, did) 65 + if err != nil { 66 + panic("failed to check account status: " + err.Error()) 67 + } 68 + 69 + fmt.Printf("active=%t status=%s\n", active, status) 70 + // active=true status= 71 + } 72 + 73 + func ExampleNetClient_GetRecordUnverified() { 74 + 75 + ctx := context.Background() 76 + nc := NewNetClient() 77 + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") 78 + collection := syntax.NSID("app.bsky.actor.profile") 79 + rkey := syntax.RecordKey("self") 80 + 81 + raw, _, err := nc.GetRecordUnverified(ctx, did, collection, rkey) 82 + if err != nil { 83 + panic("failed to fetch record: " + err.Error()) 84 + } 85 + var record map[string]any 86 + _ = json.Unmarshal(*raw, &record) 87 + 88 + fmt.Println(record["displayName"]) 89 + // AT Protocol Developers 90 + } 91 + 92 + func ExampleNetClient_GetRecord() { 93 + 94 + ctx := context.Background() 95 + nc := NewNetClient() 96 + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") 97 + collection := syntax.NSID("app.bsky.actor.profile") 98 + rkey := syntax.RecordKey("self") 99 + 100 + var record map[string]any 101 + _, err := nc.GetRecord(ctx, did, collection, rkey, &record) 102 + if err != nil { 103 + panic("failed to fetch record: " + err.Error()) 104 + } 105 + 106 + fmt.Println(record["displayName"]) 107 + // Output: AT Protocol Developers 108 + }
+178
atproto/heap/netclient.go
··· 1 + package heap 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "errors" 8 + "fmt" 9 + "io" 10 + "log/slog" 11 + "net/http" 12 + 13 + "github.com/bluesky-social/indigo/atproto/identity" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + ) 16 + 17 + type NetClient struct { 18 + Client *http.Client 19 + // NOTE: maybe should use a "resolver" which doesn't do handle resolution? or leave that to calling code to configure 20 + Dir identity.Directory 21 + UserAgent string 22 + } 23 + 24 + func NewNetClient() *NetClient { 25 + return &NetClient{ 26 + // TODO: maybe custom client: SSRF, retries, timeout 27 + Client: http.DefaultClient, 28 + Dir: identity.DefaultDirectory(), 29 + UserAgent: "cobalt-netclient", 30 + } 31 + } 32 + 33 + // Fetches repo export (CAR file). Calling code is responsible for closing the returned [io.ReadCloser] on success (often an HTTP response body). Does not verify signatures or CAR format or structure in any way. 34 + func (nc *NetClient) GetRepoCAR(ctx context.Context, did syntax.DID) (io.ReadCloser, error) { 35 + ident, err := nc.Dir.LookupDID(ctx, did) 36 + if err != nil { 37 + return nil, err 38 + } 39 + host := ident.PDSEndpoint() 40 + if host == "" { 41 + return nil, fmt.Errorf("account has no PDS host registered: %s", did.String()) 42 + } 43 + // TODO: validate host 44 + // TODO: DID escaping (?) 45 + u := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did) 46 + 47 + slog.Debug("downloading repo CAR", "did", did, "url", u) 48 + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) 49 + if err != nil { 50 + return nil, err 51 + } 52 + if nc.UserAgent != "" { 53 + req.Header.Set("User-Agent", nc.UserAgent) 54 + } 55 + req.Header.Set("Accept", "application/vnd.ipld.car") 56 + 57 + resp, err := nc.Client.Do(req) 58 + if err != nil { 59 + return nil, fmt.Errorf("fetching repo CAR file (%s): %w", did, err) 60 + } 61 + 62 + if resp.StatusCode != http.StatusOK { 63 + resp.Body.Close() 64 + return nil, fmt.Errorf("HTTP error fetching repo CAR file (%s): %d", did, resp.StatusCode) 65 + } 66 + 67 + return resp.Body, nil 68 + } 69 + 70 + // Resolves and fetches blob from the network. Calling code must close the returned [io.ReadCloser] (eg, HTTP response body). Does not verify CID. 71 + func (nc *NetClient) GetBlobReader(ctx context.Context, did syntax.DID, cid syntax.CID) (io.ReadCloser, error) { 72 + ident, err := nc.Dir.LookupDID(ctx, did) 73 + if err != nil { 74 + return nil, err 75 + } 76 + host := ident.PDSEndpoint() 77 + if host == "" { 78 + return nil, fmt.Errorf("account has no PDS host registered: %s", did.String()) 79 + } 80 + // TODO: validate host 81 + // TODO: DID escaping (?) 82 + u := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", host, did, cid) 83 + 84 + slog.Debug("downloading blob", "did", did, "cid", cid, "url", u) 85 + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) 86 + if err != nil { 87 + return nil, err 88 + } 89 + if nc.UserAgent != "" { 90 + req.Header.Set("User-Agent", nc.UserAgent) 91 + } 92 + req.Header.Set("Accept", "*/*") 93 + 94 + resp, err := nc.Client.Do(req) 95 + if err != nil { 96 + return nil, fmt.Errorf("fetching blob (%s, %s): %w", did, cid, err) 97 + } 98 + 99 + if resp.StatusCode != http.StatusOK { 100 + resp.Body.Close() 101 + return nil, fmt.Errorf("HTTP error fetching blob (%s, %s): %d", did, cid, resp.StatusCode) 102 + } 103 + 104 + return resp.Body, nil 105 + } 106 + 107 + var ErrMismatchedBlobCID = errors.New("mismatched blob CID") 108 + 109 + // Fetches blob, writes in to provided buffer, and verified CID hash. 110 + func (nc *NetClient) GetBlob(ctx context.Context, did syntax.DID, cid syntax.CID, buf *bytes.Buffer) error { 111 + stream, err := nc.GetBlobReader(ctx, did, cid) 112 + if err != nil { 113 + return err 114 + } 115 + defer stream.Close() 116 + 117 + if _, err := io.Copy(buf, stream); err != nil { 118 + return err 119 + } 120 + 121 + c, err := computeCID(buf.Bytes()) 122 + if err != nil { 123 + return err 124 + } 125 + 126 + if c.String() != cid.String() { 127 + return ErrMismatchedBlobCID 128 + } 129 + return nil 130 + } 131 + 132 + type repoStatusResp struct { 133 + Active bool `json:"active"` 134 + DID string `json:"did"` 135 + Status string `json:"status,omitempty"` 136 + } 137 + 138 + // Fetches account status. Returns a boolean indicating active state, and a string describing any non-active status. 139 + func (nc *NetClient) GetAccountStatus(ctx context.Context, did syntax.DID) (active bool, status string, err error) { 140 + ident, err := nc.Dir.LookupDID(ctx, did) 141 + if err != nil { 142 + return false, "", err 143 + } 144 + host := ident.PDSEndpoint() 145 + if host == "" { 146 + return false, "", fmt.Errorf("account has no PDS host registered: %s", did.String()) 147 + } 148 + // TODO: validate host 149 + // TODO: DID escaping (?) 150 + u := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepoStatus?did=%s", host, did) 151 + 152 + slog.Debug("fetching account status", "did", did, "url", u) 153 + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) 154 + if err != nil { 155 + return false, "", err 156 + } 157 + if nc.UserAgent != "" { 158 + req.Header.Set("User-Agent", nc.UserAgent) 159 + } 160 + req.Header.Set("Accept", "application/json") 161 + 162 + resp, err := nc.Client.Do(req) 163 + if err != nil { 164 + return false, "", fmt.Errorf("fetching account status (%s): %w", did, err) 165 + } 166 + defer resp.Body.Close() 167 + 168 + if resp.StatusCode != http.StatusOK { 169 + return false, "", fmt.Errorf("HTTP error fetching account status (%s): %d", did, resp.StatusCode) 170 + } 171 + 172 + var rsr repoStatusResp 173 + if err := json.NewDecoder(resp.Body).Decode(&rsr); err != nil { 174 + return false, "", fmt.Errorf("failed decoding account status response: %w", err) 175 + } 176 + 177 + return rsr.Active, rsr.Status, nil 178 + }
+156
atproto/heap/record.go
··· 1 + package heap 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "net/http" 10 + 11 + "github.com/bluesky-social/indigo/atproto/data" 12 + "github.com/bluesky-social/indigo/atproto/repo" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + ) 15 + 16 + type repoRecordResp struct { 17 + URI string `json:"uri"` 18 + CID syntax.CID `json:"cid"` 19 + Value json.RawMessage `json:"value"` 20 + } 21 + 22 + // Fetches record JSON using com.atproto.repo.getRecord, and returns record as [json.RawMessage] and the CID (as string). 23 + func (nc *NetClient) GetRecordUnverified(ctx context.Context, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey) (*json.RawMessage, syntax.CID, error) { 24 + ident, err := nc.Dir.LookupDID(ctx, did) 25 + if err != nil { 26 + return nil, "", err 27 + } 28 + host := ident.PDSEndpoint() 29 + if host == "" { 30 + return nil, "", fmt.Errorf("account has no PDS host registered: %s", did.String()) 31 + } 32 + // TODO: validate host 33 + // TODO: DID escaping (?) 34 + u := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", host, did, collection, rkey) 35 + 36 + slog.Debug("fetching record JSON", "did", did, "url", u) 37 + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) 38 + if err != nil { 39 + return nil, "", err 40 + } 41 + if nc.UserAgent != "" { 42 + req.Header.Set("User-Agent", nc.UserAgent) 43 + } 44 + req.Header.Set("Accept", "application/json") 45 + 46 + resp, err := nc.Client.Do(req) 47 + if err != nil { 48 + return nil, "", fmt.Errorf("fetching record JSON (%s): %w", did, err) 49 + } 50 + defer resp.Body.Close() 51 + 52 + if resp.StatusCode != http.StatusOK { 53 + return nil, "", fmt.Errorf("HTTP error fetching record JSON (%s): %d", did, resp.StatusCode) 54 + } 55 + 56 + var rrr repoRecordResp 57 + if err := json.NewDecoder(resp.Body).Decode(&rrr); err != nil { 58 + return nil, "", fmt.Errorf("failed decoding account status response: %w", err) 59 + } 60 + 61 + return &rrr.Value, rrr.CID, nil 62 + } 63 + 64 + // Fetches a record "proof" using com.atproto.sync.getRecord. Verifies signature and merkel chain. Copies record content in out 'out' parameter. 65 + // 66 + // If out is nil, record data is not returned. If it is [bytes.Buffer], the record CBOR is copied in. Otherwise, the record is transformed to JSON and Unmarshalled in to provided output, which could be a pointer to a struct, [json.RawMessage], `map[string]any`, etc. 67 + // 68 + // TODO: this might not be fully validating MST tree and record CID hashes or encoding yet 69 + func (nc *NetClient) GetRecord(ctx context.Context, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey, out any) (syntax.CID, error) { 70 + // TODO: "GetRecordProof" variant, which just returns CAR as io.ReadCloser? 71 + ident, err := nc.Dir.LookupDID(ctx, did) 72 + if err != nil { 73 + return "", err 74 + } 75 + pub, err := ident.PublicKey() 76 + if err != nil { 77 + return "", err 78 + } 79 + host := ident.PDSEndpoint() 80 + if host == "" { 81 + return "", fmt.Errorf("account has no PDS host registered: %s", did.String()) 82 + } 83 + // TODO: validate host 84 + // TODO: DID escaping (?) 85 + u := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRecord?did=%s&collection=%s&rkey=%s", host, did, collection, rkey) 86 + 87 + slog.Debug("fetching record proof", "did", did, "url", u) 88 + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) 89 + if err != nil { 90 + return "", err 91 + } 92 + if nc.UserAgent != "" { 93 + req.Header.Set("User-Agent", nc.UserAgent) 94 + } 95 + req.Header.Set("Accept", "application/vnd.ipld.car") 96 + 97 + resp, err := nc.Client.Do(req) 98 + if err != nil { 99 + return "", fmt.Errorf("fetching record proof (%s): %w", did, err) 100 + } 101 + defer resp.Body.Close() 102 + 103 + if resp.StatusCode != http.StatusOK { 104 + return "", fmt.Errorf("HTTP error fetching record proof (%s): %d", did, resp.StatusCode) 105 + } 106 + 107 + // TODO: re-confirm if loading tree re-checks all CIDs; or if we need to re-compute the tree data CID 108 + commit, rp, err := repo.LoadRepoFromCAR(ctx, resp.Body) 109 + if err != nil { 110 + return "", fmt.Errorf("failed to parse record proof CAR (%s): %w", did, err) 111 + } 112 + 113 + // NOTE: LoadRepoFromCAR calls commit.VerifyStructure() internally 114 + 115 + if err := commit.VerifySignature(pub); err != nil { 116 + return "", fmt.Errorf("failed to verify record proof signature (%s): %w", did, err) 117 + } 118 + 119 + rbytes, rcid, err := rp.GetRecordBytes(ctx, collection, rkey) 120 + if err != nil { 121 + return "", fmt.Errorf("failed to read record from proof CAR (%s): %w", did, err) 122 + } 123 + cidStr := syntax.CID(rcid.String()) 124 + 125 + // TODO: `GetRecordBytes` does not currently verify record CID, but unpacking CAR file should have done that? but need to confirm CAR implementation does this 126 + 127 + // check that record CBOR is valid, even if we don't return it 128 + rdata, err := data.UnmarshalCBOR(rbytes) 129 + if err != nil { 130 + return "", fmt.Errorf("failed to parse record CBOR (%s): %w", did, err) 131 + } 132 + 133 + switch out := out.(type) { 134 + case nil: 135 + // if output isn't captured, bail out early 136 + return cidStr, nil 137 + case *bytes.Buffer: 138 + // simply copy data over 139 + out.Reset() 140 + _, err := out.Write(rbytes) 141 + if err != nil { 142 + return "", err 143 + } 144 + return cidStr, nil 145 + default: 146 + // attempt to unmarshal from json 147 + jsonBytes, err := json.Marshal(rdata) 148 + if err != nil { 149 + return "", err 150 + } 151 + if err := json.Unmarshal(jsonBytes, out); err != nil { 152 + return "", fmt.Errorf("failed unmarhsaling record (%s): %w", did, err) 153 + } 154 + return cidStr, nil 155 + } 156 + }