Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/workers-docs 159 lines 4.6 kB view raw
1package spxrpc 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "net/http" 10 "strings" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/labstack/echo/v4" 16 "github.com/streamplace/oatproxy/pkg/oatproxy" 17 "go.opentelemetry.io/otel" 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/log" 20) 21 22func resolveRepoService(ctx context.Context, repo string) (string, string, string, error) { 23 did := repo 24 var err error 25 if !strings.HasPrefix(repo, "did:") { 26 did, err = oatproxy.ResolveHandle(ctx, repo) 27 if err != nil { 28 return "", "", "", fmt.Errorf("failed to resolve handle %q: %w", repo, err) 29 } 30 } 31 32 service, handle, err := oatproxy.ResolveService(ctx, did) 33 if err != nil { 34 return "", "", "", fmt.Errorf("failed to resolve service for did %q: %w", did, err) 35 } 36 37 return did, service, handle, nil 38} 39 40var maxBlobSize int64 = 1024 * 1024 * 10 // 10MB 41 42func (s *Server) handleComAtprotoRepoUploadBlob(ctx context.Context, r io.Reader, contentType string) (*comatproto.RepoUploadBlob_Output, error) { 43 ctx, span := otel.Tracer("server").Start(ctx, "handleComAtprotoRepoUploadBlob") 44 defer span.End() 45 46 session, client := oatproxy.GetOAuthSession(ctx) 47 if session == nil { 48 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") 49 } 50 51 // we need to buffer the blob so we can successfully retry upon dpop nonce changes 52 var err error 53 buf := bytes.Buffer{} 54 _, err = io.CopyN(&buf, r, maxBlobSize+1) 55 if err == nil { 56 return nil, echo.NewHTTPError(http.StatusBadRequest, "blob size exceeds max size") 57 } 58 if !errors.Is(err, io.EOF) { 59 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to copy reader to buffer") 60 } 61 62 var out comatproto.RepoUploadBlob_Output 63 64 err = client.Do(ctx, xrpc.Procedure, contentType, "com.atproto.repo.uploadBlob", nil, bytes.NewReader(buf.Bytes()), &out) 65 66 if err != nil { 67 log.Error(ctx, "upstream xrpc error", "error", err) 68 return nil, err 69 } 70 71 return &out, nil 72} 73 74func (s *Server) handleComAtprotoRepoDescribeRepo(ctx context.Context, repo string) (*comatproto.RepoDescribeRepo_Output, error) { 75 isLocal, svc, err := s.isLocalPDS(ctx, repo) 76 if err != nil { 77 return nil, fmt.Errorf("error checking for local PDS: %w", err) 78 } 79 if !isLocal { 80 var out comatproto.RepoDescribeRepo_Output 81 params := make(map[string]interface{}) 82 params["repo"] = repo 83 84 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.describeRepo", params, &out) 85 if err != nil { 86 log.Error(ctx, "upstream xrpc error", "error", err) 87 return nil, err 88 } 89 return &out, nil 90 91 } 92 93 return &comatproto.RepoDescribeRepo_Output{ 94 Handle: s.cli.MyDID(), 95 Did: s.cli.MyDID(), 96 DidDoc: atproto.DIDDoc(s.cli.BroadcasterHost), 97 Collections: []string{ 98 "com.atproto.lexicon.schema", 99 }, 100 HandleIsCorrect: true, 101 }, nil 102} 103 104func (s *Server) handleComAtprotoRepoListRecords(ctx context.Context, collection string, cursor string, limit int, repo string, reverse *bool) (*comatproto.RepoListRecords_Output, error) { 105 isLocal, svc, err := s.isLocalPDS(ctx, repo) 106 if err != nil { 107 return nil, fmt.Errorf("error checking for local PDS: %w", err) 108 } 109 if !isLocal { 110 var out comatproto.RepoListRecords_Output 111 params := make(map[string]interface{}) 112 params["collection"] = collection 113 if cursor != "" { 114 params["cursor"] = cursor 115 } 116 if limit != 0 { 117 params["limit"] = limit 118 } 119 if reverse != nil { 120 params["reverse"] = *reverse 121 } 122 params["repo"] = repo 123 124 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.listRecords", params, &out) 125 if err != nil { 126 log.Error(ctx, "upstream xrpc error", "error", err) 127 return nil, err 128 } 129 return &out, nil 130 } 131 132 return atproto.LexiconRepoListRecords(ctx, collection, cursor, limit, repo, reverse) 133} 134 135func (s *Server) handleComAtprotoRepoGetRecord(ctx context.Context, c string, collection string, repo string, rkey string) (*comatproto.RepoGetRecord_Output, error) { 136 isLocal, svc, err := s.isLocalPDS(ctx, repo) 137 if err != nil { 138 return nil, fmt.Errorf("error checking for local PDS: %w", err) 139 } 140 if !isLocal { 141 var out comatproto.RepoGetRecord_Output 142 params := make(map[string]interface{}) 143 params["repo"] = repo 144 params["collection"] = collection 145 params["rkey"] = rkey 146 if c != "" { 147 params["cid"] = c 148 } 149 150 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.getRecord", params, &out) 151 if err != nil { 152 log.Error(ctx, "upstream xrpc error", "error", err) 153 return nil, err 154 } 155 return &out, nil 156 } 157 158 return atproto.LexiconRepoGetRecord(ctx, repo, collection, rkey) 159}