+10
-9
HACKING.md
+10
-9
HACKING.md
···
3
3
4
4
Run with, eg, `go run ./cmd/bigsky`):
5
5
6
-
- `cmd/bigsky`: BGS+indexer daemon
6
+
- `cmd/bigsky`: Relay+indexer daemon
7
7
- `cmd/palomar`: search indexer and query servcie (OpenSearch)
8
8
- `cmd/gosky`: client CLI for talking to a PDS
9
9
- `cmd/lexgen`: codegen tool for lexicons (Lexicon JSON to Go package)
···
25
25
- `atproto/syntax`: string types and parsers for identifiers, datetimes, etc
26
26
- `atproto/identity`: DID and handle resolution
27
27
- `automod`: moderation and anti-spam rules engine
28
-
- `bgs`: server implementation for crawling, etc
28
+
- `bgs`: relay server implementation for crawling, etc
29
29
- `carstore`: library for storing repo data in CAR files on disk, plus a metadata SQL db
30
30
- `events`: types, codegen CBOR helpers, and persistence for event feeds
31
31
- `indexer`: aggregator, handling like counts etc in SQL database
···
45
45
46
46
## Jargon
47
47
48
-
- BGS: Big Graph Service (or Server), which centrals crawls/consumes content from "all" PDSs and re-broadcasts as a firehose
48
+
- Relay: service which crawls/consumes content from "all" PDSs and re-broadcasts as a firehose
49
+
- BGS: Big Graph Service, previous name for what is now "Relay"
49
50
- PDS: Personal Data Server (or Service), which stores user atproto repositories and acts as a user agent in the network
50
51
- CLI: Command Line Tool
51
52
- CBOR: a binary serialization format, smilar to JSON
···
89
90
# consume repo events from PDS
90
91
websocat ws://localhost:4989/events
91
92
92
-
# consume repo events from BGS
93
+
# consume repo events from Relay
93
94
websocat ws://localhost:2470/events
94
95
95
-
Send the BGS a ding-dong:
96
+
Send the Relay a ding-dong:
96
97
97
-
# tell BGS to consume from PDS
98
+
# tell Relay to consume from PDS
98
99
http --json post localhost:2470/add-target host="localhost:4989"
99
100
100
101
Set the log level to be more verbose, using an env variable:
···
114
115
115
116
## Integrated Development
116
117
117
-
Sometimes it is helpful to run a PLC, PDS, BGS, and other components, all locally on your laptop, across languages. This section describes one setup for this.
118
+
Sometimes it is helpful to run a PLC, PDS, Relay, and other components, all locally on your laptop, across languages. This section describes one setup for this.
118
119
119
120
First, you need PostgreSQL running locally. This could be via docker, or the following commands assume some kind of debian/ubuntu setup with a postgres server package installed and running.
120
121
···
139
140
140
141
make run-dev-pds
141
142
142
-
In this repo (indigo), start a BGS, in two separate terminals:
143
+
In this repo (indigo), start a Relay, in two separate terminals:
143
144
144
-
make run-dev-bgs
145
+
make run-dev-relay
145
146
146
147
In a final terminal, run fakermaker to inject data into the system:
147
148
+6
-6
Makefile
+6
-6
Makefile
···
78
78
run-postgres: .env ## Runs a local postgres instance
79
79
docker compose -f cmd/bigsky/docker-compose.yml up -d
80
80
81
-
.PHONY: run-dev-bgs
82
-
run-dev-bgs: .env ## Runs 'bigsky' BGS for local dev
81
+
.PHONY: run-dev-relay
82
+
run-dev-relay: .env ## Runs 'bigsky' Relay for local dev
83
83
GOLOG_LOG_LEVEL=info go run ./cmd/bigsky --admin-key localdev
84
84
# --crawl-insecure-ws
85
85
86
-
.PHONY: build-bgs-image
87
-
build-bgs-image: ## Builds 'bigsky' BGS docker image
86
+
.PHONY: build-relay-image
87
+
build-relay-image: ## Builds 'bigsky' Relay docker image
88
88
docker build -t bigsky -f cmd/bigsky/Dockerfile .
89
89
90
-
.PHONY: run-bgs-image
91
-
run-bgs-image:
90
+
.PHONY: run-relay-image
91
+
run-relay-image:
92
92
docker run -p 2470:2470 bigsky /bigsky --admin-key localdev
93
93
# --crawl-insecure-ws
94
94
+1
-1
cmd/bigsky/Dockerfile
+1
-1
cmd/bigsky/Dockerfile
···
45
45
CMD ["/bigsky"]
46
46
47
47
LABEL org.opencontainers.image.source=https://github.com/bluesky-social/indigo
48
-
LABEL org.opencontainers.image.description="ATP Big Graph Server (BGS)"
48
+
LABEL org.opencontainers.image.description="ATP Relay (aka BGS)"
49
49
LABEL org.opencontainers.image.licenses=MIT
+2
-2
cmd/bigsky/README.md
+2
-2
cmd/bigsky/README.md
+5
-5
cmd/hepa/consumer.go
+5
-5
cmd/hepa/consumer.go
···
32
32
}
33
33
34
34
dialer := websocket.DefaultDialer
35
-
u, err := url.Parse(s.bgshost)
35
+
u, err := url.Parse(s.relayHost)
36
36
if err != nil {
37
-
return fmt.Errorf("invalid bgshost URI: %w", err)
37
+
return fmt.Errorf("invalid relayHost URI: %w", err)
38
38
}
39
39
u.Path = "xrpc/com.atproto.sync.subscribeRepos"
40
40
if cur != 0 {
41
41
u.RawQuery = fmt.Sprintf("cursor=%d", cur)
42
42
}
43
-
s.logger.Info("subscribing to repo event stream", "upstream", s.bgshost, "cursor", cur)
43
+
s.logger.Info("subscribing to repo event stream", "upstream", s.relayHost, "cursor", cur)
44
44
con, _, err := dialer.Dial(u.String(), http.Header{
45
45
"User-Agent": []string{fmt.Sprintf("hepa/%s", versioninfo.Short())},
46
46
})
···
98
98
scheduler = parallel.NewScheduler(
99
99
s.firehoseParallelism,
100
100
1000,
101
-
s.bgshost,
101
+
s.relayHost,
102
102
rsc.EventHandler,
103
103
)
104
104
s.logger.Info("hepa scheduler configured", "scheduler", "parallel", "initial", s.firehoseParallelism)
···
108
108
// start at higher parallelism (somewhat arbitrary)
109
109
scaleSettings.Concurrency = 4
110
110
scaleSettings.MaxConcurrency = 200
111
-
scheduler = autoscaling.NewScheduler(scaleSettings, s.bgshost, rsc.EventHandler)
111
+
scheduler = autoscaling.NewScheduler(scaleSettings, s.relayHost, rsc.EventHandler)
112
112
s.logger.Info("hepa scheduler configured", "scheduler", "autoscaling", "initial", scaleSettings.Concurrency, "max", scaleSettings.MaxConcurrency)
113
113
}
114
114
+5
-5
cmd/hepa/main.go
+5
-5
cmd/hepa/main.go
···
41
41
42
42
app.Flags = []cli.Flag{
43
43
&cli.StringFlag{
44
-
Name: "atp-bgs-host",
45
-
Usage: "hostname and port of BGS to subscribe to",
44
+
Name: "atp-relay-host",
45
+
Usage: "hostname and port of Relay to subscribe to",
46
46
Value: "wss://bsky.network",
47
-
EnvVars: []string{"ATP_BGS_HOST"},
47
+
EnvVars: []string{"ATP_RELAY_HOST", "ATP_BGS_HOST"},
48
48
},
49
49
&cli.StringFlag{
50
50
Name: "atp-plc-host",
···
219
219
srv, err := NewServer(
220
220
dir,
221
221
Config{
222
-
BGSHost: cctx.String("atp-bgs-host"),
222
+
RelayHost: cctx.String("atp-relay-host"),
223
223
BskyHost: cctx.String("atp-bsky-host"),
224
224
Logger: logger,
225
225
ModHost: cctx.String("atp-mod-host"),
···
286
286
return NewServer(
287
287
dir,
288
288
Config{
289
-
BGSHost: cctx.String("atp-bgs-host"),
289
+
RelayHost: cctx.String("atp-relay-host"),
290
290
BskyHost: cctx.String("atp-bsky-host"),
291
291
Logger: logger,
292
292
ModHost: cctx.String("atp-mod-host"),
+6
-6
cmd/hepa/server.go
+6
-6
cmd/hepa/server.go
···
27
27
)
28
28
29
29
type Server struct {
30
-
bgshost string
30
+
relayHost string
31
31
firehoseParallelism int
32
32
logger *slog.Logger
33
33
engine *automod.Engine
···
41
41
}
42
42
43
43
type Config struct {
44
-
BGSHost string
44
+
RelayHost string
45
45
BskyHost string
46
46
ModHost string
47
47
ModAdminToken string
···
67
67
}))
68
68
}
69
69
70
-
bgsws := config.BGSHost
71
-
if !strings.HasPrefix(bgsws, "ws") {
72
-
return nil, fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'")
70
+
relayws := config.RelayHost
71
+
if !strings.HasPrefix(relayws, "ws") {
72
+
return nil, fmt.Errorf("specified relay host must include 'ws://' or 'wss://'")
73
73
}
74
74
75
75
// TODO: this isn't a very robust way to handle a persistent client
···
206
206
}
207
207
208
208
s := &Server{
209
-
bgshost: config.BGSHost,
209
+
relayHost: config.RelayHost,
210
210
firehoseParallelism: config.FirehoseParallelism,
211
211
logger: logger,
212
212
engine: &engine,
+1
-1
cmd/supercollider/main.go
+1
-1
cmd/supercollider/main.go
+25
-25
testing/integ_test.go
+25
-25
testing/integ_test.go
···
23
23
log.SetAllLoggers(log.LevelInfo)
24
24
}
25
25
26
-
func TestBGSBasic(t *testing.T) {
26
+
func TestRelayBasic(t *testing.T) {
27
27
if testing.Short() {
28
-
t.Skip("skipping BGS test in 'short' test mode")
28
+
t.Skip("skipping Relay test in 'short' test mode")
29
29
}
30
30
assert := assert.New(t)
31
31
didr := TestPLC(t)
32
32
p1 := MustSetupPDS(t, ".tpds", didr)
33
33
p1.Run(t)
34
34
35
-
b1 := MustSetupBGS(t, didr)
35
+
b1 := MustSetupRelay(t, didr)
36
36
b1.Run(t)
37
37
38
38
b1.tr.TrialHosts = []string{p1.RawHost()}
···
114
114
return posts
115
115
}
116
116
117
-
func TestBGSMultiPDS(t *testing.T) {
117
+
func TestRelayMultiPDS(t *testing.T) {
118
118
if testing.Short() {
119
-
t.Skip("skipping BGS test in 'short' test mode")
119
+
t.Skip("skipping Relay test in 'short' test mode")
120
120
}
121
121
//t.Skip("test too sleepy to run in CI for now")
122
122
···
129
129
p2 := MustSetupPDS(t, ".pdsdos", didr)
130
130
p2.Run(t)
131
131
132
-
b1 := MustSetupBGS(t, didr)
132
+
b1 := MustSetupRelay(t, didr)
133
133
b1.Run(t)
134
134
135
135
b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()}
···
158
158
159
159
users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life")
160
160
161
-
// now if we make posts on pds 2, the bgs will not hear about those new posts
161
+
// now if we make posts on pds 2, the relay will not hear about those new posts
162
162
163
163
p2posts2 := socialSim(t, users2, 10, 10)
164
164
···
168
168
p2.BumpLimits(t, b1)
169
169
time.Sleep(time.Millisecond * 50)
170
170
171
-
// Now, the bgs will discover a gap, and have to catch up somehow
171
+
// Now, the relay will discover a gap, and have to catch up somehow
172
172
socialSim(t, users2, 1, 0)
173
173
174
174
time.Sleep(time.Second)
175
175
176
-
// we expect the bgs to learn about posts that it did not directly see from
176
+
// we expect the relay to learn about posts that it did not directly see from
177
177
// repos its already partially scraped, as long as its seen *something* after the missing post
178
178
// this is the 'catchup' process
179
179
ctx := context.Background()
···
183
183
}
184
184
}
185
185
186
-
func TestBGSMultiGap(t *testing.T) {
186
+
func TestRelayMultiGap(t *testing.T) {
187
187
if testing.Short() {
188
-
t.Skip("skipping BGS test in 'short' test mode")
188
+
t.Skip("skipping Relay test in 'short' test mode")
189
189
}
190
190
//t.Skip("test too sleepy to run in CI for now")
191
191
assert := assert.New(t)
···
197
197
p2 := MustSetupPDS(t, ".pdsdos", didr)
198
198
p2.Run(t)
199
199
200
-
b1 := MustSetupBGS(t, didr)
200
+
b1 := MustSetupRelay(t, didr)
201
201
b1.Run(t)
202
202
203
203
b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()}
···
223
223
t.Fatal(err)
224
224
}
225
225
226
-
// now if we make posts on pds 2, the bgs will not hear about those new posts
226
+
// now if we make posts on pds 2, the relay will not hear about those new posts
227
227
228
228
p2posts2 := socialSim(t, users2, 10, 0)
229
229
···
233
233
p2.BumpLimits(t, b1)
234
234
time.Sleep(time.Second * 2)
235
235
236
-
// Now, the bgs will discover a gap, and have to catch up somehow
236
+
// Now, the relay will discover a gap, and have to catch up somehow
237
237
socialSim(t, users2, 1, 0)
238
238
239
239
time.Sleep(time.Second * 2)
240
240
241
-
// we expect the bgs to learn about posts that it did not directly see from
241
+
// we expect the relay to learn about posts that it did not directly see from
242
242
// repos its already partially scraped, as long as its seen *something* after the missing post
243
243
// this is the 'catchup' process
244
244
_, err = b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri)
···
255
255
p1 := MustSetupPDS(t, ".pdsuno", didr)
256
256
p1.Run(t)
257
257
258
-
b1 := MustSetupBGS(t, didr)
258
+
b1 := MustSetupRelay(t, didr)
259
259
b1.Run(t)
260
260
261
261
b1.tr.TrialHosts = []string{p1.RawHost()}
···
268
268
269
269
u := p1.MustNewUser(t, usernames[0]+".pdsuno")
270
270
271
-
// if the handle changes before the bgs processes the first event, things
271
+
// if the handle changes before the relay processes the first event, things
272
272
// get a little weird
273
273
time.Sleep(time.Millisecond * 50)
274
274
//socialSim(t, []*testUser{u}, 10, 0)
···
285
285
fmt.Println(idevt.RepoIdentity)
286
286
}
287
287
288
-
func TestBGSTakedown(t *testing.T) {
288
+
func TestRelayTakedown(t *testing.T) {
289
289
if testing.Short() {
290
-
t.Skip("skipping BGS test in 'short' test mode")
290
+
t.Skip("skipping Relay test in 'short' test mode")
291
291
}
292
292
assert := assert.New(t)
293
293
_ = assert
···
296
296
p1 := MustSetupPDS(t, ".tpds", didr)
297
297
p1.Run(t)
298
298
299
-
b1 := MustSetupBGS(t, didr)
299
+
b1 := MustSetupRelay(t, didr)
300
300
b1.Run(t)
301
301
302
302
b1.tr.TrialHosts = []string{p1.RawHost()}
···
371
371
372
372
func TestDomainBans(t *testing.T) {
373
373
if testing.Short() {
374
-
t.Skip("skipping BGS test in 'short' test mode")
374
+
t.Skip("skipping Relay test in 'short' test mode")
375
375
}
376
376
didr := TestPLC(t)
377
377
378
-
b1 := MustSetupBGS(t, didr)
378
+
b1 := MustSetupRelay(t, didr)
379
379
b1.Run(t)
380
380
381
381
b1.BanDomain(t, "foo.com")
···
409
409
}
410
410
}
411
411
412
-
func TestBGSHandleEmptyEvent(t *testing.T) {
412
+
func TestRelayHandleEmptyEvent(t *testing.T) {
413
413
if testing.Short() {
414
-
t.Skip("skipping BGS test in 'short' test mode")
414
+
t.Skip("skipping Relay test in 'short' test mode")
415
415
}
416
416
assert := assert.New(t)
417
417
didr := TestPLC(t)
418
418
p1 := MustSetupPDS(t, ".tpds", didr)
419
419
p1.Run(t)
420
420
421
-
b1 := MustSetupBGS(t, didr)
421
+
b1 := MustSetupRelay(t, didr)
422
422
b1.Run(t)
423
423
424
424
b1.tr.TrialHosts = []string{p1.RawHost()}
+15
-15
testing/utils.go
+15
-15
testing/utils.go
···
164
164
}
165
165
}
166
166
167
-
func (tp *TestPDS) RequestScraping(t *testing.T, b *TestBGS) {
167
+
func (tp *TestPDS) RequestScraping(t *testing.T, b *TestRelay) {
168
168
t.Helper()
169
169
170
170
c := &xrpc.Client{Host: "http://" + b.Host()}
···
173
173
}
174
174
}
175
175
176
-
func (tp *TestPDS) BumpLimits(t *testing.T, b *TestBGS) {
176
+
func (tp *TestPDS) BumpLimits(t *testing.T, b *TestRelay) {
177
177
t.Helper()
178
178
179
179
err := b.bgs.CreateAdminToken("test")
···
413
413
return plc.NewFakeDid(db)
414
414
}
415
415
416
-
type TestBGS struct {
416
+
type TestRelay struct {
417
417
bgs *bgs.BGS
418
418
tr *api.TestHandleResolver
419
419
db *gorm.DB
420
420
421
-
// listener is owned by by the BGS structure and should be closed by
422
-
// shutting down the BGS.
421
+
// listener is owned by by the Relay structure and should be closed by
422
+
// shutting down the Relay.
423
423
listener net.Listener
424
424
}
425
425
426
-
func (t *TestBGS) Host() string {
426
+
func (t *TestRelay) Host() string {
427
427
return t.listener.Addr().String()
428
428
}
429
429
430
-
func MustSetupBGS(t *testing.T, didr plc.PLCClient) *TestBGS {
430
+
func MustSetupRelay(t *testing.T, didr plc.PLCClient) *TestRelay {
431
431
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
432
432
defer cancel()
433
-
tbgs, err := SetupBGS(ctx, didr)
433
+
tbgs, err := SetupRelay(ctx, didr)
434
434
if err != nil {
435
435
t.Fatal(err)
436
436
}
···
438
438
return tbgs
439
439
}
440
440
441
-
func SetupBGS(ctx context.Context, didr plc.PLCClient) (*TestBGS, error) {
441
+
func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) {
442
442
dir, err := os.MkdirTemp("", "integtest")
443
443
if err != nil {
444
444
return nil, err
···
485
485
486
486
repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) {
487
487
if err := ix.HandleRepoEvent(ctx, evt); err != nil {
488
-
fmt.Println("test bgs failed to handle repo event", err)
488
+
fmt.Println("test relay failed to handle repo event", err)
489
489
}
490
-
}, true) // TODO: actually want this to be false, but some tests use this to confirm the BGS has seen certain records
490
+
}, true) // TODO: actually want this to be false, but some tests use this to confirm the Relay has seen certain records
491
491
492
492
tr := &api.TestHandleResolver{}
493
493
···
502
502
return nil, err
503
503
}
504
504
505
-
return &TestBGS{
505
+
return &TestRelay{
506
506
db: maindb,
507
507
bgs: b,
508
508
tr: tr,
···
510
510
}, nil
511
511
}
512
512
513
-
func (b *TestBGS) Run(t *testing.T) {
513
+
func (b *TestRelay) Run(t *testing.T) {
514
514
go func() {
515
515
if err := b.bgs.StartWithListener(b.listener); err != nil {
516
516
fmt.Println(err)
···
519
519
time.Sleep(time.Millisecond * 10)
520
520
}
521
521
522
-
func (b *TestBGS) BanDomain(t *testing.T, d string) {
522
+
func (b *TestRelay) BanDomain(t *testing.T, d string) {
523
523
t.Helper()
524
524
525
525
if err := b.db.Create(&models.DomainBan{
···
537
537
Cur int
538
538
}
539
539
540
-
func (b *TestBGS) Events(t *testing.T, since int64) *EventStream {
540
+
func (b *TestRelay) Events(t *testing.T, since int64) *EventStream {
541
541
d := websocket.Dialer{}
542
542
h := http.Header{}
543
543