+1
-1
cmd/palomar/README.md
+1
-1
cmd/palomar/README.md
···
19
19
20
20
Palomar uses environment variables for configuration.
21
21
22
-
- `ATP_BGS_HOST`: URL of firehose to subscribe to, either global BGS or individual PDS (default: `wss://bsky.social`)
22
+
- `ATP_RELAY_HOST`: URL of firehose to subscribe to, either global Relay or individual PDS (default: `wss://bsky.network`)
23
23
- `ATP_PLC_HOST`: PLC directory for identity lookups (default: `https://plc.directory`)
24
24
- `DATABASE_URL`: connection string for database to persist firehose cursor subscription state
25
25
- `PALOMAR_BIND`: IP/port to have HTTP API listen on (default: `:3999`)
+9
-9
cmd/palomar/main.go
+9
-9
cmd/palomar/main.go
···
87
87
EnvVars: []string{"ES_PROFILE_INDEX"},
88
88
},
89
89
&cli.StringFlag{
90
-
Name: "atp-bgs-host",
91
-
Usage: "hostname and port of BGS to subscribe to",
92
-
Value: "wss://bsky.social",
93
-
EnvVars: []string{"ATP_BGS_HOST"},
90
+
Name: "atp-relay-host",
91
+
Usage: "hostname and port of Relay to subscribe to",
92
+
Value: "wss://bsky.network",
93
+
EnvVars: []string{"ATP_RELAY_HOST", "ATP_BGS_HOST"},
94
94
},
95
95
&cli.StringFlag{
96
96
Name: "atp-plc-host",
···
141
141
EnvVars: []string{"PALOMAR_METRICS_LISTEN"},
142
142
},
143
143
&cli.IntFlag{
144
-
Name: "bgs-sync-rate-limit",
145
-
Usage: "max repo sync (checkout) requests per second to upstream (BGS)",
144
+
Name: "relay-sync-rate-limit",
145
+
Usage: "max repo sync (checkout) requests per second to upstream (Relay)",
146
146
Value: 8,
147
-
EnvVars: []string{"PALOMAR_BGS_SYNC_RATE_LIMIT"},
147
+
EnvVars: []string{"PALOMAR_RELAY_SYNC_RATE_LIMIT", "PALOMAR_BGS_SYNC_RATE_LIMIT"},
148
148
},
149
149
&cli.IntFlag{
150
150
Name: "index-max-concurrency",
···
233
233
escli,
234
234
&dir,
235
235
search.Config{
236
-
BGSHost: cctx.String("atp-bgs-host"),
236
+
RelayHost: cctx.String("atp-relay-host"),
237
237
ProfileIndex: cctx.String("es-profile-index"),
238
238
PostIndex: cctx.String("es-post-index"),
239
239
Logger: logger,
240
-
BGSSyncRateLimit: cctx.Int("bgs-sync-rate-limit"),
240
+
RelaySyncRateLimit: cctx.Int("relay-sync-rate-limit"),
241
241
IndexMaxConcurrency: cctx.Int("index-max-concurrency"),
242
242
},
243
243
)
+5
-5
search/firehose.go
+5
-5
search/firehose.go
···
58
58
}
59
59
60
60
d := websocket.DefaultDialer
61
-
u, err := url.Parse(s.bgshost)
61
+
u, err := url.Parse(s.relayHost)
62
62
if err != nil {
63
-
return fmt.Errorf("invalid bgshost URI: %w", err)
63
+
return fmt.Errorf("invalid relayHost URI: %w", err)
64
64
}
65
65
u.Path = "xrpc/com.atproto.sync.subscribeRepos"
66
66
if cur != 0 {
···
132
132
return events.HandleRepoStream(
133
133
ctx, con, autoscaling.NewScheduler(
134
134
autoscaling.DefaultAutoscaleSettings(),
135
-
s.bgshost,
135
+
s.relayHost,
136
136
rsc.EventHandler,
137
137
),
138
138
)
···
150
150
totalErrored := 0
151
151
152
152
for {
153
-
resp, err := comatproto.SyncListRepos(ctx, s.bgsxrpc, cursor, limit)
153
+
resp, err := comatproto.SyncListRepos(ctx, s.relayClient, cursor, limit)
154
154
if err != nil {
155
155
log.Error("failed to list repos", "err", err)
156
156
time.Sleep(5 * time.Second)
···
260
260
}
261
261
262
262
func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
263
-
repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "")
263
+
repodata, err := comatproto.SyncGetRepo(ctx, s.relayClient, evt.Repo, "")
264
264
if err != nil {
265
265
return err
266
266
}
+16
-16
search/server.go
+16
-16
search/server.go
···
29
29
postIndex string
30
30
profileIndex string
31
31
db *gorm.DB
32
-
bgshost string
33
-
bgsxrpc *xrpc.Client
32
+
relayHost string
33
+
relayClient *xrpc.Client
34
34
dir identity.Directory
35
35
echo *echo.Echo
36
36
logger *slog.Logger
···
47
47
}
48
48
49
49
type Config struct {
50
-
BGSHost string
50
+
RelayHost string
51
51
ProfileIndex string
52
52
PostIndex string
53
53
Logger *slog.Logger
54
-
BGSSyncRateLimit int
54
+
RelaySyncRateLimit int
55
55
IndexMaxConcurrency int
56
56
DiscoverRepos bool
57
57
}
···
68
68
db.AutoMigrate(&LastSeq{})
69
69
db.AutoMigrate(&backfill.GormDBJob{})
70
70
71
-
bgsws := config.BGSHost
72
-
if !strings.HasPrefix(bgsws, "ws") {
73
-
return nil, fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'")
71
+
relayws := config.RelayHost
72
+
if !strings.HasPrefix(relayws, "ws") {
73
+
return nil, fmt.Errorf("specified relay host must include 'ws://' or 'wss://'")
74
74
}
75
75
76
-
bgshttp := strings.Replace(bgsws, "ws", "http", 1)
77
-
bgsxrpc := &xrpc.Client{
78
-
Host: bgshttp,
76
+
relayhttp := strings.Replace(relayws, "ws", "http", 1)
77
+
relayClient := &xrpc.Client{
78
+
Host: relayhttp,
79
79
}
80
80
81
81
s := &Server{
···
83
83
profileIndex: config.ProfileIndex,
84
84
postIndex: config.PostIndex,
85
85
db: db,
86
-
bgshost: config.BGSHost, // NOTE: the original URL, not 'bgshttp'
87
-
bgsxrpc: bgsxrpc,
86
+
relayHost: config.RelayHost, // NOTE: the original URL, not 'relayhttp'
87
+
relayClient: relayClient,
88
88
dir: dir,
89
89
logger: logger,
90
90
enableRepoDiscovery: config.DiscoverRepos,
···
92
92
93
93
bfstore := backfill.NewGormstore(db)
94
94
opts := backfill.DefaultBackfillOptions()
95
-
if config.BGSSyncRateLimit > 0 {
96
-
opts.SyncRequestsPerSecond = config.BGSSyncRateLimit
97
-
opts.ParallelBackfills = 2 * config.BGSSyncRateLimit
95
+
if config.RelaySyncRateLimit > 0 {
96
+
opts.SyncRequestsPerSecond = config.RelaySyncRateLimit
97
+
opts.ParallelBackfills = 2 * config.RelaySyncRateLimit
98
98
} else {
99
99
opts.SyncRequestsPerSecond = 8
100
100
}
101
-
opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", bgshttp)
101
+
opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", relayhttp)
102
102
if config.IndexMaxConcurrency > 0 {
103
103
opts.ParallelRecordCreates = config.IndexMaxConcurrency
104
104
} else {