Live video on the AT Protocol
1package spxrpc
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "net/http"
10 "strings"
11
12 comatprototypes "github.com/bluesky-social/indigo/api/atproto"
13 lexutil "github.com/bluesky-social/indigo/lex/util"
14
15 "github.com/bluesky-social/indigo/xrpc"
16 "github.com/ipfs/go-cid"
17 "github.com/labstack/echo/v4"
18 "github.com/streamplace/oatproxy/pkg/oatproxy"
19 "go.opentelemetry.io/otel"
20 "stream.place/streamplace/pkg/atproto"
21 "stream.place/streamplace/pkg/log"
22)
23
24func resolveRepoService(ctx context.Context, repo string) (string, string, string, error) {
25 did := repo
26 var err error
27 if !strings.HasPrefix(repo, "did:") {
28 did, err = oatproxy.ResolveHandle(ctx, repo)
29 if err != nil {
30 return "", "", "", fmt.Errorf("failed to resolve handle %q: %w", repo, err)
31 }
32 }
33
34 service, handle, err := oatproxy.ResolveService(ctx, did)
35 if err != nil {
36 return "", "", "", fmt.Errorf("failed to resolve service for did %q: %w", did, err)
37 }
38
39 return did, service, handle, nil
40}
41
42var maxBlobSize int64 = 1024 * 1024 * 10 // 10MB
43
44func (s *Server) handleComAtprotoRepoUploadBlob(ctx context.Context, r io.Reader, contentType string) (*comatprototypes.RepoUploadBlob_Output, error) {
45 ctx, span := otel.Tracer("server").Start(ctx, "handleComAtprotoRepoUploadBlob")
46 defer span.End()
47
48 session, client := oatproxy.GetOAuthSession(ctx)
49 if session == nil {
50 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found")
51 }
52
53 // we need to buffer the blob so we can successfully retry upon dpop nonce changes
54 var err error
55 buf := bytes.Buffer{}
56 _, err = io.CopyN(&buf, r, maxBlobSize+1)
57 if err == nil {
58 return nil, echo.NewHTTPError(http.StatusBadRequest, "blob size exceeds max size")
59 }
60 if !errors.Is(err, io.EOF) {
61 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to copy reader to buffer")
62 }
63
64 var out comatprototypes.RepoUploadBlob_Output
65
66 err = client.Do(ctx, xrpc.Procedure, contentType, "com.atproto.repo.uploadBlob", nil, bytes.NewReader(buf.Bytes()), &out)
67
68 if err != nil {
69 log.Error(ctx, "upstream xrpc error", "error", err)
70 return nil, err
71 }
72
73 return &out, nil
74}
75
76func (s *Server) handleComAtprotoRepoDescribeRepo(ctx context.Context, repo string) (*comatprototypes.RepoDescribeRepo_Output, error) {
77 isLocal, svc, err := s.isLocalPDS(ctx, repo)
78 if err != nil {
79 return nil, fmt.Errorf("error checking for local PDS: %w", err)
80 }
81 if !isLocal {
82 var out comatprototypes.RepoDescribeRepo_Output
83 params := make(map[string]interface{})
84 params["repo"] = repo
85
86 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.describeRepo", params, &out)
87 if err != nil {
88 log.Error(ctx, "upstream xrpc error", "error", err)
89 return nil, err
90 }
91 return &out, nil
92
93 }
94
95 return &comatprototypes.RepoDescribeRepo_Output{
96 Handle: s.cli.MyDID(),
97 Did: s.cli.MyDID(),
98 DidDoc: atproto.DIDDoc(s.cli.PublicHost),
99 Collections: []string{
100 "com.atproto.lexicon.schema",
101 },
102 HandleIsCorrect: true,
103 }, nil
104}
105
106func (s *Server) handleComAtprotoRepoListRecords(ctx context.Context, collection string, cursor string, limit int, repo string, reverse *bool) (*comatprototypes.RepoListRecords_Output, error) {
107 isLocal, svc, err := s.isLocalPDS(ctx, repo)
108 if err != nil {
109 return nil, fmt.Errorf("error checking for local PDS: %w", err)
110 }
111 if !isLocal {
112 var out comatprototypes.RepoListRecords_Output
113 params := make(map[string]interface{})
114 params["collection"] = collection
115 if cursor != "" {
116 params["cursor"] = cursor
117 }
118 if limit != 0 {
119 params["limit"] = limit
120 }
121 if reverse != nil {
122 params["reverse"] = *reverse
123 }
124 params["repo"] = repo
125
126 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.listRecords", params, &out)
127 if err != nil {
128 log.Error(ctx, "upstream xrpc error", "error", err)
129 return nil, err
130 }
131 return &out, nil
132 }
133
134 r, ses, err := atproto.OpenLexiconRepo(ctx)
135 if err != nil {
136 return nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to open repo: %w", err)
137 }
138 out := &comatprototypes.RepoListRecords_Output{
139 Records: []*comatprototypes.RepoListRecords_Record{},
140 }
141 err = r.ForEach(ctx, "", func(rkey string, c cid.Cid) error {
142 val, err := atproto.GetRecordCBOR(ctx, ses, c, collection, rkey)
143 if err != nil {
144 return fmt.Errorf("handleComAtprotoRepoListRecords: failed to get record for collection %q, rkey %q: %w", collection, rkey, err)
145 }
146 out.Records = append(out.Records, &comatprototypes.RepoListRecords_Record{
147 Uri: fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey),
148 Cid: c.String(),
149 Value: &lexutil.LexiconTypeDecoder{Val: val},
150 })
151
152 return nil
153 })
154 if err != nil {
155 return nil, fmt.Errorf("handleComAtprotoRepoListRecords: error iterating records for collection %q: %w", collection, err)
156 }
157 return out, nil
158}
159
160func (s *Server) handleComAtprotoRepoGetRecord(ctx context.Context, c string, collection string, repo string, rkey string) (*comatprototypes.RepoGetRecord_Output, error) {
161 isLocal, svc, err := s.isLocalPDS(ctx, repo)
162 if err != nil {
163 return nil, fmt.Errorf("error checking for local PDS: %w", err)
164 }
165 if !isLocal {
166 var out comatprototypes.RepoGetRecord_Output
167 params := make(map[string]interface{})
168 params["repo"] = repo
169 params["collection"] = collection
170 params["rkey"] = rkey
171 if c != "" {
172 params["cid"] = c
173 }
174
175 err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.getRecord", params, &out)
176 if err != nil {
177 log.Error(ctx, "upstream xrpc error", "error", err)
178 return nil, err
179 }
180 return &out, nil
181 }
182
183 r, ses, err := atproto.OpenLexiconRepo(ctx)
184 if err != nil {
185 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err)
186 }
187 outCID, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", collection, rkey))
188 if err != nil {
189 return nil, err
190 }
191 rec, err := atproto.GetRecordCBOR(ctx, ses, outCID, collection, rkey)
192 if err != nil {
193 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get record: %w", err)
194 }
195 str := outCID.String()
196 return &comatprototypes.RepoGetRecord_Output{
197 Uri: fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey),
198 Cid: &str,
199 Value: &lexutil.LexiconTypeDecoder{Val: rec},
200 }, nil
201}