Live video on the AT Protocol
1package spxrpc
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "net/http"
10 "strings"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/labstack/echo/v4"
16 "github.com/streamplace/oatproxy/pkg/oatproxy"
17 "go.opentelemetry.io/otel"
18 "stream.place/streamplace/pkg/atproto"
19 "stream.place/streamplace/pkg/log"
20)
21
22func resolveRepoService(ctx context.Context, repo string) (string, string, string, error) {
23 did := repo
24 var err error
25 if !strings.HasPrefix(repo, "did:") {
26 did, err = oatproxy.ResolveHandle(ctx, repo)
27 if err != nil {
28 return "", "", "", fmt.Errorf("failed to resolve handle %q: %w", repo, err)
29 }
30 }
31
32 service, handle, err := oatproxy.ResolveService(ctx, did)
33 if err != nil {
34 return "", "", "", fmt.Errorf("failed to resolve service for did %q: %w", did, err)
35 }
36
37 return did, service, handle, nil
38}
39
40var maxBlobSize int64 = 1024 * 1024 * 10 // 10MB
41
42func (s *Server) handleComAtprotoRepoUploadBlob(ctx context.Context, r io.Reader, contentType string) (*comatproto.RepoUploadBlob_Output, error) {
43 ctx, span := otel.Tracer("server").Start(ctx, "handleComAtprotoRepoUploadBlob")
44 defer span.End()
45
46 session, client := oatproxy.GetOAuthSession(ctx)
47 if session == nil {
48 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found")
49 }
50
51 // we need to buffer the blob so we can successfully retry upon dpop nonce changes
52 var err error
53 buf := bytes.Buffer{}
54 _, err = io.CopyN(&buf, r, maxBlobSize+1)
55 if err == nil {
56 return nil, echo.NewHTTPError(http.StatusBadRequest, "blob size exceeds max size")
57 }
58 if !errors.Is(err, io.EOF) {
59 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to copy reader to buffer")
60 }
61
62 var out comatproto.RepoUploadBlob_Output
63
64 err = client.Do(ctx, xrpc.Procedure, contentType, "com.atproto.repo.uploadBlob", nil, bytes.NewReader(buf.Bytes()), &out)
65
66 if err != nil {
67 log.Error(ctx, "upstream xrpc error", "error", err)
68 return nil, err
69 }
70
71 return &out, nil
72}
73
74func (s *Server) handleComAtprotoRepoDescribeRepo(ctx context.Context, repo string) (*comatproto.RepoDescribeRepo_Output, error) {
75 isLocal, svc, err := s.isLocalPDS(ctx, repo)
76 if err != nil {
77 return nil, fmt.Errorf("error checking for local PDS: %w", err)
78 }
79 if !isLocal {
80 var out comatproto.RepoDescribeRepo_Output
81 params := make(map[string]interface{})
82 params["repo"] = repo
83
84 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.describeRepo", params, &out)
85 if err != nil {
86 log.Error(ctx, "upstream xrpc error", "error", err)
87 return nil, err
88 }
89 return &out, nil
90
91 }
92
93 return &comatproto.RepoDescribeRepo_Output{
94 Handle: s.cli.MyDID(),
95 Did: s.cli.MyDID(),
96 DidDoc: atproto.DIDDoc(s.cli.BroadcasterHost),
97 Collections: []string{
98 "com.atproto.lexicon.schema",
99 },
100 HandleIsCorrect: true,
101 }, nil
102}
103
104func (s *Server) handleComAtprotoRepoListRecords(ctx context.Context, collection string, cursor string, limit int, repo string, reverse *bool) (*comatproto.RepoListRecords_Output, error) {
105 isLocal, svc, err := s.isLocalPDS(ctx, repo)
106 if err != nil {
107 return nil, fmt.Errorf("error checking for local PDS: %w", err)
108 }
109 if !isLocal {
110 var out comatproto.RepoListRecords_Output
111 params := make(map[string]interface{})
112 params["collection"] = collection
113 if cursor != "" {
114 params["cursor"] = cursor
115 }
116 if limit != 0 {
117 params["limit"] = limit
118 }
119 if reverse != nil {
120 params["reverse"] = *reverse
121 }
122 params["repo"] = repo
123
124 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.listRecords", params, &out)
125 if err != nil {
126 log.Error(ctx, "upstream xrpc error", "error", err)
127 return nil, err
128 }
129 return &out, nil
130 }
131
132 return atproto.LexiconRepoListRecords(ctx, collection, cursor, limit, repo, reverse)
133}
134
135func (s *Server) handleComAtprotoRepoGetRecord(ctx context.Context, c string, collection string, repo string, rkey string) (*comatproto.RepoGetRecord_Output, error) {
136 isLocal, svc, err := s.isLocalPDS(ctx, repo)
137 if err != nil {
138 return nil, fmt.Errorf("error checking for local PDS: %w", err)
139 }
140 if !isLocal {
141 var out comatproto.RepoGetRecord_Output
142 params := make(map[string]interface{})
143 params["repo"] = repo
144 params["collection"] = collection
145 params["rkey"] = rkey
146 if c != "" {
147 params["cid"] = c
148 }
149
150 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.getRecord", params, &out)
151 if err != nil {
152 log.Error(ctx, "upstream xrpc error", "error", err)
153 return nil, err
154 }
155 return &out, nil
156 }
157
158 return atproto.LexiconRepoGetRecord(ctx, repo, collection, rkey)
159}