Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/localhost-dev-is-back 201 lines 6.2 kB view raw
1package spxrpc 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "net/http" 10 "strings" 11 12 comatprototypes "github.com/bluesky-social/indigo/api/atproto" 13 lexutil "github.com/bluesky-social/indigo/lex/util" 14 15 "github.com/bluesky-social/indigo/xrpc" 16 "github.com/ipfs/go-cid" 17 "github.com/labstack/echo/v4" 18 "github.com/streamplace/oatproxy/pkg/oatproxy" 19 "go.opentelemetry.io/otel" 20 "stream.place/streamplace/pkg/atproto" 21 "stream.place/streamplace/pkg/log" 22) 23 24func resolveRepoService(ctx context.Context, repo string) (string, string, string, error) { 25 did := repo 26 var err error 27 if !strings.HasPrefix(repo, "did:") { 28 did, err = oatproxy.ResolveHandle(ctx, repo) 29 if err != nil { 30 return "", "", "", fmt.Errorf("failed to resolve handle %q: %w", repo, err) 31 } 32 } 33 34 service, handle, err := oatproxy.ResolveService(ctx, did) 35 if err != nil { 36 return "", "", "", fmt.Errorf("failed to resolve service for did %q: %w", did, err) 37 } 38 39 return did, service, handle, nil 40} 41 42var maxBlobSize int64 = 1024 * 1024 * 10 // 10MB 43 44func (s *Server) handleComAtprotoRepoUploadBlob(ctx context.Context, r io.Reader, contentType string) (*comatprototypes.RepoUploadBlob_Output, error) { 45 ctx, span := otel.Tracer("server").Start(ctx, "handleComAtprotoRepoUploadBlob") 46 defer span.End() 47 48 session, client := oatproxy.GetOAuthSession(ctx) 49 if session == nil { 50 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") 51 } 52 53 // we need to buffer the blob so we can successfully retry upon dpop nonce changes 54 var err error 55 buf := bytes.Buffer{} 56 _, err = io.CopyN(&buf, r, maxBlobSize+1) 57 if err == nil { 58 return nil, echo.NewHTTPError(http.StatusBadRequest, "blob size exceeds max size") 59 } 60 if !errors.Is(err, io.EOF) { 61 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to copy reader to buffer") 62 } 63 64 var out comatprototypes.RepoUploadBlob_Output 65 66 err = client.Do(ctx, xrpc.Procedure, contentType, "com.atproto.repo.uploadBlob", nil, bytes.NewReader(buf.Bytes()), &out) 67 68 if err != nil { 69 log.Error(ctx, "upstream xrpc error", "error", err) 70 return nil, err 71 } 72 73 return &out, nil 74} 75 76func (s *Server) handleComAtprotoRepoDescribeRepo(ctx context.Context, repo string) (*comatprototypes.RepoDescribeRepo_Output, error) { 77 isLocal, svc, err := s.isLocalPDS(ctx, repo) 78 if err != nil { 79 return nil, fmt.Errorf("error checking for local PDS: %w", err) 80 } 81 if !isLocal { 82 var out comatprototypes.RepoDescribeRepo_Output 83 params := make(map[string]interface{}) 84 params["repo"] = repo 85 86 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.describeRepo", params, &out) 87 if err != nil { 88 log.Error(ctx, "upstream xrpc error", "error", err) 89 return nil, err 90 } 91 return &out, nil 92 93 } 94 95 return &comatprototypes.RepoDescribeRepo_Output{ 96 Handle: s.cli.MyDID(), 97 Did: s.cli.MyDID(), 98 DidDoc: atproto.DIDDoc(s.cli.PublicHost), 99 Collections: []string{ 100 "com.atproto.lexicon.schema", 101 }, 102 HandleIsCorrect: true, 103 }, nil 104} 105 106func (s *Server) handleComAtprotoRepoListRecords(ctx context.Context, collection string, cursor string, limit int, repo string, reverse *bool) (*comatprototypes.RepoListRecords_Output, error) { 107 isLocal, svc, err := s.isLocalPDS(ctx, repo) 108 if err != nil { 109 return nil, fmt.Errorf("error checking for local PDS: %w", err) 110 } 111 if !isLocal { 112 var out comatprototypes.RepoListRecords_Output 113 params := make(map[string]interface{}) 114 params["collection"] = collection 115 if cursor != "" { 116 params["cursor"] = cursor 117 } 118 if limit != 0 { 119 params["limit"] = limit 120 } 121 if reverse != nil { 122 params["reverse"] = *reverse 123 } 124 params["repo"] = repo 125 126 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.listRecords", params, &out) 127 if err != nil { 128 log.Error(ctx, "upstream xrpc error", "error", err) 129 return nil, err 130 } 131 return &out, nil 132 } 133 134 r, ses, err := atproto.OpenLexiconRepo(ctx) 135 if err != nil { 136 return nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to open repo: %w", err) 137 } 138 out := &comatprototypes.RepoListRecords_Output{ 139 Records: []*comatprototypes.RepoListRecords_Record{}, 140 } 141 err = r.ForEach(ctx, "", func(rkey string, c cid.Cid) error { 142 val, err := atproto.GetRecordCBOR(ctx, ses, c, collection, rkey) 143 if err != nil { 144 return fmt.Errorf("handleComAtprotoRepoListRecords: failed to get record for collection %q, rkey %q: %w", collection, rkey, err) 145 } 146 out.Records = append(out.Records, &comatprototypes.RepoListRecords_Record{ 147 Uri: fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey), 148 Cid: c.String(), 149 Value: &lexutil.LexiconTypeDecoder{Val: val}, 150 }) 151 152 return nil 153 }) 154 if err != nil { 155 return nil, fmt.Errorf("handleComAtprotoRepoListRecords: error iterating records for collection %q: %w", collection, err) 156 } 157 return out, nil 158} 159 160func (s *Server) handleComAtprotoRepoGetRecord(ctx context.Context, c string, collection string, repo string, rkey string) (*comatprototypes.RepoGetRecord_Output, error) { 161 isLocal, svc, err := s.isLocalPDS(ctx, repo) 162 if err != nil { 163 return nil, fmt.Errorf("error checking for local PDS: %w", err) 164 } 165 if !isLocal { 166 var out comatprototypes.RepoGetRecord_Output 167 params := make(map[string]interface{}) 168 params["repo"] = repo 169 params["collection"] = collection 170 params["rkey"] = rkey 171 if c != "" { 172 params["cid"] = c 173 } 174 175 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.getRecord", params, &out) 176 if err != nil { 177 log.Error(ctx, "upstream xrpc error", "error", err) 178 return nil, err 179 } 180 return &out, nil 181 } 182 183 r, ses, err := atproto.OpenLexiconRepo(ctx) 184 if err != nil { 185 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err) 186 } 187 outCID, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", collection, rkey)) 188 if err != nil { 189 return nil, err 190 } 191 rec, err := atproto.GetRecordCBOR(ctx, ses, outCID, collection, rkey) 192 if err != nil { 193 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get record: %w", err) 194 } 195 str := outCID.String() 196 return &comatprototypes.RepoGetRecord_Output{ 197 Uri: fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey), 198 Cid: &str, 199 Value: &lexutil.LexiconTypeDecoder{Val: rec}, 200 }, nil 201}