tangled
alpha
login
or
join now
stream.place
/
streamplace
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
Proxy repo requests to correct service host
Natalie Bridgers
4 months ago
e32245ae
3c10d384
+196
-2
2 changed files
expand all
collapse all
unified
split
pkg
spxrpc
com_atproto_repo.go
spxrpc.go
+151
-2
pkg/spxrpc/com_atproto_repo.go
···
5
5
"fmt"
6
6
"io"
7
7
"net/http"
8
8
+
"strings"
8
9
9
10
comatprototypes "github.com/bluesky-social/indigo/api/atproto"
10
11
lexutil "github.com/bluesky-social/indigo/lex/util"
···
18
19
"stream.place/streamplace/pkg/log"
19
20
)
20
21
22
22
+
func resolveRepoService(ctx context.Context, repo string) (did, service, handle string, err error) {
23
23
+
did = repo
24
24
+
if !strings.HasPrefix(repo, "did:") {
25
25
+
did, err = oatproxy.ResolveHandle(ctx, repo)
26
26
+
if err != nil {
27
27
+
return "", "", "", fmt.Errorf("failed to resolve handle %q: %w", repo, err)
28
28
+
}
29
29
+
}
30
30
+
31
31
+
service, handle, err = oatproxy.ResolveService(ctx, did)
32
32
+
if err != nil {
33
33
+
return "", "", "", fmt.Errorf("failed to resolve service for did %q: %w", did, err)
34
34
+
}
35
35
+
36
36
+
return did, service, handle, nil
37
37
+
}
38
38
+
21
39
func (s *Server) handleComAtprotoRepoUploadBlob(ctx context.Context, r io.Reader, contentType string) (*comatprototypes.RepoUploadBlob_Output, error) {
22
40
ctx, span := otel.Tracer("server").Start(ctx, "handleComAtprotoRepoUploadBlob")
23
41
defer span.End()
···
43
61
}
44
62
45
63
func (s *Server) handleComAtprotoRepoDescribeRepo(ctx context.Context, repo string) (*comatprototypes.RepoDescribeRepo_Output, error) {
64
64
+
did, svc, handle, err := resolveRepoService(ctx, repo)
65
65
+
if err != nil {
66
66
+
return nil, fmt.Errorf("handleComAtprotoRepoDescribeRepo: %w", err)
67
67
+
}
68
68
+
69
69
+
// if the service isn't the current host, we proxy the request
70
70
+
if svc != s.cli.PublicHost {
71
71
+
session, client := oatproxy.GetOAuthSession(ctx)
72
72
+
if session != nil {
73
73
+
// authenticated request
74
74
+
var out *comatprototypes.RepoDescribeRepo_Output
75
75
+
params := make(map[string]any)
76
76
+
params["repo"] = repo
77
77
+
78
78
+
err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.describeRepo", params, nil, &out)
79
79
+
if err != nil {
80
80
+
log.Error(ctx, "upstream xrpc error", "error", err)
81
81
+
return nil, err
82
82
+
}
83
83
+
return out, nil
84
84
+
} else {
85
85
+
// unauthenticated request
86
86
+
var out comatprototypes.RepoDescribeRepo_Output
87
87
+
params := make(map[string]interface{})
88
88
+
params["repo"] = repo
89
89
+
90
90
+
err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.describeRepo", params, &out)
91
91
+
if err != nil {
92
92
+
log.Error(ctx, "upstream xrpc error", "error", err)
93
93
+
return nil, err
94
94
+
}
95
95
+
return &out, nil
96
96
+
}
97
97
+
}
98
98
+
46
99
return &comatprototypes.RepoDescribeRepo_Output{
47
47
-
Handle: s.cli.PublicHost,
48
48
-
Did: fmt.Sprintf("did:web:%s", s.cli.PublicHost),
100
100
+
Handle: handle,
101
101
+
Did: did,
49
102
DidDoc: atproto.DIDDoc(s.cli.PublicHost),
50
103
Collections: []string{
51
104
"com.atproto.lexicon.schema",
···
55
108
}
56
109
57
110
func (s *Server) handleComAtprotoRepoListRecords(ctx context.Context, collection string, cursor string, limit int, repo string, reverse *bool) (*comatprototypes.RepoListRecords_Output, error) {
111
111
+
_, svc, _, err := resolveRepoService(ctx, repo)
112
112
+
if err != nil {
113
113
+
return nil, fmt.Errorf("handleComAtprotoRepoListRecords: %w", err)
114
114
+
}
115
115
+
// if the service isn't the current host, we proxy the request
116
116
+
if svc != s.cli.PublicHost {
117
117
+
session, client := oatproxy.GetOAuthSession(ctx)
118
118
+
if session != nil {
119
119
+
// authenticated request
120
120
+
var out *comatprototypes.RepoListRecords_Output
121
121
+
xrpcType := xrpc.Procedure
122
122
+
params := make(map[string]any)
123
123
+
params["collection"] = collection
124
124
+
if cursor != "" {
125
125
+
params["cursor"] = cursor
126
126
+
}
127
127
+
if limit != 0 {
128
128
+
params["limit"] = limit
129
129
+
}
130
130
+
if reverse != nil {
131
131
+
params["reverse"] = *reverse
132
132
+
}
133
133
+
err = client.Do(ctx, xrpcType, "application/json", "com.atproto.repo.listRecords", params, nil, &out)
134
134
+
if err != nil {
135
135
+
log.Error(ctx, "upstream xrpc error", "error", err)
136
136
+
return nil, err
137
137
+
}
138
138
+
return out, nil
139
139
+
} else {
140
140
+
// unauthenticated request
141
141
+
var out comatprototypes.RepoListRecords_Output
142
142
+
params := make(map[string]interface{})
143
143
+
params["collection"] = collection
144
144
+
if cursor != "" {
145
145
+
params["cursor"] = cursor
146
146
+
}
147
147
+
if limit != 0 {
148
148
+
params["limit"] = limit
149
149
+
}
150
150
+
if reverse != nil {
151
151
+
params["reverse"] = *reverse
152
152
+
}
153
153
+
err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.listRecords", params, &out)
154
154
+
if err != nil {
155
155
+
log.Error(ctx, "upstream xrpc error", "error", err)
156
156
+
return nil, err
157
157
+
}
158
158
+
return &out, nil
159
159
+
}
160
160
+
}
161
161
+
58
162
r, ses, err := atproto.OpenLexiconRepo(ctx)
59
163
if err != nil {
60
164
return nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to open repo: %w", err)
···
82
186
}
83
187
84
188
func (s *Server) handleComAtprotoRepoGetRecord(ctx context.Context, c string, collection string, repo string, rkey string) (*comatprototypes.RepoGetRecord_Output, error) {
189
189
+
_, svc, _, err := resolveRepoService(ctx, repo)
190
190
+
if err != nil {
191
191
+
return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: %w", err)
192
192
+
}
193
193
+
194
194
+
// if the service isn't the current host, we proxy the request
195
195
+
if svc != s.cli.PublicHost {
196
196
+
session, client := oatproxy.GetOAuthSession(ctx)
197
197
+
if session != nil {
198
198
+
// authenticated request
199
199
+
var out *comatprototypes.RepoGetRecord_Output
200
200
+
params := make(map[string]interface{})
201
201
+
params["repo"] = repo
202
202
+
params["collection"] = collection
203
203
+
params["rkey"] = rkey
204
204
+
if c != "" {
205
205
+
params["cid"] = c
206
206
+
}
207
207
+
208
208
+
err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", params, nil, &out)
209
209
+
if err != nil {
210
210
+
log.Error(ctx, "upstream xrpc error", "error", err)
211
211
+
return nil, err
212
212
+
}
213
213
+
return out, nil
214
214
+
} else {
215
215
+
// unauthenticated request
216
216
+
var out comatprototypes.RepoGetRecord_Output
217
217
+
params := make(map[string]interface{})
218
218
+
params["repo"] = repo
219
219
+
params["collection"] = collection
220
220
+
params["rkey"] = rkey
221
221
+
if c != "" {
222
222
+
params["cid"] = c
223
223
+
}
224
224
+
225
225
+
err = makeUnauthenticatedRequest(ctx, svc, "com.atproto.repo.getRecord", params, &out)
226
226
+
if err != nil {
227
227
+
log.Error(ctx, "upstream xrpc error", "error", err)
228
228
+
return nil, err
229
229
+
}
230
230
+
return &out, nil
231
231
+
}
232
232
+
}
233
233
+
85
234
r, ses, err := atproto.OpenLexiconRepo(ctx)
86
235
if err != nil {
87
236
return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err)
+45
pkg/spxrpc/spxrpc.go
···
2
2
3
3
import (
4
4
"context"
5
5
+
"encoding/json"
6
6
+
"fmt"
7
7
+
"io"
5
8
"net/http"
9
9
+
"net/url"
6
10
"time"
7
11
8
12
"github.com/labstack/echo/v4"
···
10
14
"github.com/slok/go-http-metrics/middleware"
11
15
echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
12
16
"github.com/streamplace/oatproxy/pkg/oatproxy"
17
17
+
"stream.place/streamplace/pkg/aqhttp"
13
18
"stream.place/streamplace/pkg/atproto"
14
19
"stream.place/streamplace/pkg/config"
15
20
"stream.place/streamplace/pkg/log"
···
59
64
e.GET("/xrpc/*", s.HandleWildcard)
60
65
e.POST("/xrpc/*", s.HandleWildcard)
61
66
return s, nil
67
67
+
}
68
68
+
69
69
+
func makeUnauthenticatedRequest(ctx context.Context, service, method string, params map[string]interface{}, out interface{}) error {
70
70
+
u, err := url.Parse(fmt.Sprintf("https://%s/xrpc/%s", service, method))
71
71
+
if err != nil {
72
72
+
return fmt.Errorf("failed to parse URL: %w", err)
73
73
+
}
74
74
+
75
75
+
// add query parameters
76
76
+
query := u.Query()
77
77
+
for k, v := range params {
78
78
+
query.Set(k, fmt.Sprintf("%v", v))
79
79
+
}
80
80
+
u.RawQuery = query.Encode()
81
81
+
82
82
+
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
83
83
+
if err != nil {
84
84
+
return fmt.Errorf("failed to create request: %w", err)
85
85
+
}
86
86
+
87
87
+
resp, err := aqhttp.Client.Do(req)
88
88
+
if err != nil {
89
89
+
return fmt.Errorf("failed to make request: %w", err)
90
90
+
}
91
91
+
defer resp.Body.Close()
92
92
+
93
93
+
if resp.StatusCode != http.StatusOK {
94
94
+
return fmt.Errorf("upstream request failed with status %d", resp.StatusCode)
95
95
+
}
96
96
+
97
97
+
body, err := io.ReadAll(resp.Body)
98
98
+
if err != nil {
99
99
+
return fmt.Errorf("failed to read response body: %w", err)
100
100
+
}
101
101
+
102
102
+
if err := json.Unmarshal(body, out); err != nil {
103
103
+
return fmt.Errorf("failed to unmarshal response: %w", err)
104
104
+
}
105
105
+
106
106
+
return nil
62
107
}
63
108
64
109
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {