+8
-5
bgs/bgs.go
+8
-5
bgs/bgs.go
···
596
596
consumerID := bgs.registerConsumer(&consumer)
597
597
defer bgs.cleanupConsumer(consumerID)
598
598
599
-
log.Infow("new consumer",
599
+
logger := log.With(
600
+
"consumer_id", consumerID,
600
601
"remote_addr", consumer.RemoteAddr,
601
602
"user_agent", consumer.UserAgent,
602
-
"cursor", since,
603
-
"consumer_id", consumerID,
604
603
)
605
604
605
+
logger.Infow("new consumer", "cursor", since)
606
+
606
607
header := events.EventHeader{Op: events.EvtKindMessage}
607
608
for {
608
609
select {
609
610
case evt, ok := <-evts:
610
611
if !ok {
612
+
logger.Error("event stream closed unexpectedly")
611
613
return nil
612
614
}
615
+
613
616
wc, err := conn.NextWriter(websocket.BinaryMessage)
614
617
if err != nil {
615
-
log.Errorf("failed to get next writer: %s", err)
618
+
logger.Errorf("failed to get next writer: %s", err)
616
619
return err
617
620
}
618
621
···
650
653
}
651
654
652
655
if err := wc.Close(); err != nil {
653
-
log.Warnf("failed to flush-close our event write: %s", err)
656
+
logger.Warnf("failed to flush-close our event write: %s", err)
654
657
return nil
655
658
}
656
659
+79
-1
bgs/fedmgr.go
+79
-1
bgs/fedmgr.go
···
5
5
"errors"
6
6
"fmt"
7
7
"math/rand"
8
+
"strings"
8
9
"sync"
9
10
"time"
10
11
···
16
17
"golang.org/x/time/rate"
17
18
18
19
"github.com/gorilla/websocket"
20
+
pq "github.com/lib/pq"
19
21
"gorm.io/gorm"
20
22
)
21
23
···
36
38
DefaultCrawlLimit rate.Limit
37
39
38
40
newSubsDisabled bool
41
+
trustedDomains []string
39
42
40
43
shutdownChan chan bool
41
44
shutdownResult chan []error
···
171
174
}
172
175
173
176
s.newSubsDisabled = sc.NewSubsDisabled
177
+
s.trustedDomains = sc.TrustedDomains
174
178
175
179
return nil
176
180
}
···
179
183
gorm.Model
180
184
181
185
NewSubsDisabled bool
186
+
TrustedDomains pq.StringArray `gorm:"type:text[]"`
182
187
}
183
188
184
189
func (s *Slurper) SetNewSubsDisabled(dis bool) error {
···
199
204
return s.newSubsDisabled
200
205
}
201
206
207
+
func (s *Slurper) AddTrustedDomain(domain string) error {
208
+
s.lk.Lock()
209
+
defer s.lk.Unlock()
210
+
211
+
if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", gorm.Expr("array_append(trusted_domains, ?)", domain)).Error; err != nil {
212
+
return err
213
+
}
214
+
215
+
s.trustedDomains = append(s.trustedDomains, domain)
216
+
return nil
217
+
}
218
+
219
+
func (s *Slurper) RemoveTrustedDomain(domain string) error {
220
+
s.lk.Lock()
221
+
defer s.lk.Unlock()
222
+
223
+
if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", gorm.Expr("array_remove(trusted_domains, ?)", domain)).Error; err != nil {
224
+
if errors.Is(err, gorm.ErrRecordNotFound) {
225
+
return nil
226
+
}
227
+
return err
228
+
}
229
+
230
+
for i, d := range s.trustedDomains {
231
+
if d == domain {
232
+
s.trustedDomains = append(s.trustedDomains[:i], s.trustedDomains[i+1:]...)
233
+
break
234
+
}
235
+
}
236
+
237
+
return nil
238
+
}
239
+
240
+
func (s *Slurper) SetTrustedDomains(domains []string) error {
241
+
s.lk.Lock()
242
+
defer s.lk.Unlock()
243
+
244
+
if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", domains).Error; err != nil {
245
+
return err
246
+
}
247
+
248
+
s.trustedDomains = domains
249
+
return nil
250
+
}
251
+
252
+
func (s *Slurper) GetTrustedDomains() []string {
253
+
s.lk.Lock()
254
+
defer s.lk.Unlock()
255
+
return s.trustedDomains
256
+
}
257
+
202
258
var ErrNewSubsDisabled = fmt.Errorf("new subscriptions temporarily disabled")
203
259
260
+
// Checks whether a host is allowed to be subscribed to
261
+
// must be called with the slurper lock held
262
+
func (s *Slurper) canSlurpHost(host string) bool {
263
+
// Check if the host is a trusted domain
264
+
for _, d := range s.trustedDomains {
265
+
// If the domain starts with a *., it's a wildcard
266
+
if strings.HasPrefix(d, "*.") {
267
+
// Cut off the * so we have .domain.com
268
+
if strings.HasSuffix(host, strings.TrimPrefix(d, "*")) {
269
+
return true
270
+
}
271
+
} else {
272
+
if host == d {
273
+
return true
274
+
}
275
+
}
276
+
}
277
+
278
+
return !s.newSubsDisabled
279
+
}
280
+
204
281
func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool) error {
205
282
// TODO: for performance, lock on the hostname instead of global
206
283
s.lk.Lock()
207
284
defer s.lk.Unlock()
208
-
if s.newSubsDisabled {
285
+
286
+
if !s.canSlurpHost(host) {
209
287
return ErrNewSubsDisabled
210
288
}
211
289
+33
-12
bgs/handlers.go
+33
-12
bgs/handlers.go
···
7
7
"fmt"
8
8
"io"
9
9
"net/http"
10
+
"net/url"
10
11
"strconv"
11
12
"strings"
12
13
···
17
18
"github.com/bluesky-social/indigo/mst"
18
19
"gorm.io/gorm"
19
20
20
-
"github.com/bluesky-social/indigo/util"
21
21
"github.com/bluesky-social/indigo/xrpc"
22
22
"github.com/ipfs/go-cid"
23
23
cbor "github.com/ipfs/go-ipld-cbor"
···
108
108
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname")
109
109
}
110
110
111
-
if strings.HasPrefix(host, "https://") || strings.HasPrefix(host, "http://") {
112
-
return echo.NewHTTPError(http.StatusBadRequest, "must pass domain without protocol scheme")
111
+
if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
112
+
if s.ssl {
113
+
host = "https://" + host
114
+
} else {
115
+
host = "http://" + host
116
+
}
113
117
}
114
118
115
-
norm, err := util.NormalizeHostname(host)
119
+
u, err := url.Parse(host)
116
120
if err != nil {
117
-
return echo.NewHTTPError(http.StatusBadRequest, "failed to normalize hostname")
121
+
return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname")
122
+
}
123
+
124
+
if u.Scheme == "http" && s.ssl {
125
+
return echo.NewHTTPError(http.StatusBadRequest, "this server requires https")
126
+
}
127
+
128
+
if u.Scheme == "https" && !s.ssl {
129
+
return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https")
130
+
}
131
+
132
+
if u.Path != "" {
133
+
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path")
134
+
}
135
+
136
+
if u.Query().Encode() != "" {
137
+
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query")
118
138
}
119
139
140
+
host = u.Host // potentially hostname:port
141
+
120
142
banned, err := s.domainIsBanned(ctx, host)
121
143
if banned {
122
144
return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned")
···
124
146
125
147
log.Warnf("TODO: better host validation for crawl requests")
126
148
149
+
clientHost := fmt.Sprintf("%s://%s", u.Scheme, host)
150
+
127
151
c := &xrpc.Client{
128
-
Host: "https://" + host,
152
+
Host: clientHost,
129
153
Client: http.DefaultClient, // not using the client that auto-retries
130
-
}
131
-
132
-
if !s.ssl {
133
-
c.Host = "http://" + host
134
154
}
135
155
136
156
desc, err := atproto.ServerDescribeServer(ctx, c)
137
157
if err != nil {
138
-
return echo.NewHTTPError(http.StatusBadRequest, "requested host failed to respond to describe request")
158
+
errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost)
159
+
return echo.NewHTTPError(http.StatusBadRequest, errMsg)
139
160
}
140
161
141
162
// Maybe we could do something with this response later
142
163
_ = desc
143
164
144
-
return s.slurper.SubscribeToPds(ctx, norm, true)
165
+
return s.slurper.SubscribeToPds(ctx, host, true)
145
166
}
146
167
147
168
func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error {
+1
go.mod
+1
go.mod
+2
go.sum
+2
go.sum
···
405
405
github.com/lestrrat-go/option v1.0.0/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
406
406
github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU=
407
407
github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
408
+
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
409
+
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
408
410
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
409
411
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
410
412
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=