+85
README.md
+85
README.md
···
109
109
110
110
The frontend will be available at http://localhost:3000 and will connect to the API at http://localhost:4444.
111
111
112
+
## Running the Bluesky App against Konbini
113
+
114
+
Konbini implements a large portion of the app.bsky.\* appview endpoints that
115
+
are required for pointing the main app to it and having it work reasonably
116
+
well.
117
+
118
+
To accomplish this you will need a few things:
119
+
120
+
### Service DID
121
+
122
+
You will need a DID, preferably a did:web for your appview that points at a
123
+
public endpoint where your appview is accessible via https.
124
+
I'll get into the https proxy next, but for the did, I've just pointed a domain
125
+
I own (in my case appview1.bluesky.day) to a VPS, and used caddy to host a file
126
+
at `/.well-known/did.json`.
127
+
That file should look like this:
128
+
129
+
```json
130
+
{
131
+
"@context": [
132
+
"https://www.w3.org/ns/did/v1",
133
+
"https://w3id.org/security/multikey/v1"
134
+
],
135
+
"id": "did:web:appview1.bluesky.day",
136
+
"verificationMethod": [
137
+
{
138
+
"id": "did:web:api.bsky.app#atproto",
139
+
"type": "Multikey",
140
+
"controller": "did:web:api.bsky.app",
141
+
"publicKeyMultibase": "zQ3shpRzb2NDriwCSSsce6EqGxG23kVktHZc57C3NEcuNy1jg"
142
+
}
143
+
],
144
+
"service": [
145
+
{
146
+
"id": "#bsky_notif",
147
+
"type": "BskyNotificationService",
148
+
"serviceEndpoint": "YOUR APPVIEW HTTPS URL"
149
+
},
150
+
{
151
+
"id": "#bsky_appview",
152
+
"type": "BskyAppView",
153
+
"serviceEndpoint": "YOUR APPVIEW HTTPS URL"
154
+
}
155
+
]
156
+
}
157
+
```
158
+
159
+
The verificationMethod isn't used but i'm not sure if _something_ is required
160
+
there or not, so i'm just leaving that there, it works on my machine.
161
+
162
+
### HTTPS Endpoint
163
+
164
+
I've been using ngrok to proxy traffic from a publicly accessible https url to my appview.
165
+
You can simply run `ngrok http 4446` and it will give you an https url that you
166
+
can then put in your DID doc above.
167
+
168
+
### The Social App
169
+
170
+
Now, clone and build the social app:
171
+
172
+
```
173
+
git clone https://github.com/bluesky-social/social-app
174
+
cd social-app
175
+
yarn
176
+
```
177
+
178
+
And then set this environment variable that tells it to use your appview:
179
+
180
+
```
181
+
export EXPO_PUBLIC_BLUESKY_PROXY_DID=did:web:YOURDIDWEB
182
+
```
183
+
184
+
And finally run the app:
185
+
186
+
```
187
+
yarn web
188
+
```
189
+
190
+
This takes a while on first load since its building everything.
191
+
After that, load the localhost url it gives you and it _should_ work.
192
+
112
193
## License
113
194
114
195
MIT (whyrusleeping)
196
+
197
+
```
198
+
199
+
```
+33
-12
main.go
+33
-12
main.go
···
1
1
package main
2
2
3
3
import (
4
+
"bytes"
4
5
"context"
5
6
"fmt"
6
7
"log"
···
19
20
"github.com/bluesky-social/indigo/atproto/syntax"
20
21
"github.com/bluesky-social/indigo/cmd/relay/stream"
21
22
"github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
23
+
"github.com/bluesky-social/indigo/repo"
22
24
"github.com/bluesky-social/indigo/util/cliutil"
23
25
xrpclib "github.com/bluesky-social/indigo/xrpc"
24
26
"github.com/gorilla/websocket"
···
153
155
client: cc,
154
156
dir: dir,
155
157
156
-
missingProfiles: make(chan string, 1024),
157
-
missingPosts: make(chan string, 1024),
158
-
missingFeedGenerators: make(chan string, 1024),
158
+
missingRecords: make(chan MissingRecord, 1024),
159
159
}
160
160
fmt.Println("MY DID: ", s.mydid)
161
161
···
200
200
http.ListenAndServe(":4445", nil)
201
201
}()
202
202
203
-
go s.missingProfileFetcher()
204
-
go s.missingPostFetcher()
205
-
go s.missingFeedGeneratorFetcher()
203
+
go s.missingRecordFetcher()
206
204
207
205
seqno, err := loadLastSeq(db, "firehose_seq")
208
206
if err != nil {
···
227
225
seqLk sync.Mutex
228
226
lastSeq int64
229
227
230
-
mpLk sync.Mutex
231
-
missingProfiles chan string
232
-
missingPosts chan string
233
-
missingFeedGenerators chan string
228
+
mpLk sync.Mutex
229
+
missingRecords chan MissingRecord
234
230
}
235
231
236
232
func (s *Server) getXrpcClient() (*xrpclib.Client, error) {
···
369
365
return err
370
366
}
371
367
372
-
_ = resp
373
-
return nil
368
+
c := &xrpclib.Client{
369
+
Host: resp.PDSEndpoint(),
370
+
}
371
+
372
+
repob, err := atproto.SyncGetRepo(ctx, c, did, "")
373
+
if err != nil {
374
+
return err
375
+
}
376
+
377
+
rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repob))
378
+
if err != nil {
379
+
return err
380
+
}
381
+
382
+
return rep.ForEach(ctx, "", func(k string, v cid.Cid) error {
383
+
blk, err := rep.Blockstore().Get(ctx, v)
384
+
if err != nil {
385
+
slog.Error("record missing in repo", "path", k, "cid", v, "error", err)
386
+
return nil
387
+
}
388
+
389
+
d := blk.RawData()
390
+
if err := s.backend.HandleCreate(ctx, did, "", k, &d, &v); err != nil {
391
+
slog.Error("failed to index record", "path", k, "cid", v, "error", err)
392
+
}
393
+
return nil
394
+
})
374
395
375
396
}
+61
-47
missing.go
+61
-47
missing.go
···
5
5
"context"
6
6
"fmt"
7
7
"log/slog"
8
-
"strings"
9
8
10
9
"github.com/bluesky-social/indigo/api/atproto"
11
10
"github.com/bluesky-social/indigo/api/bsky"
12
11
"github.com/bluesky-social/indigo/atproto/syntax"
13
12
xrpclib "github.com/bluesky-social/indigo/xrpc"
14
13
"github.com/ipfs/go-cid"
15
-
"github.com/labstack/gommon/log"
16
14
)
17
15
18
-
func (s *Server) addMissingProfile(ctx context.Context, did string) {
16
+
type MissingRecordType string
17
+
18
+
const (
19
+
MissingRecordTypeProfile MissingRecordType = "profile"
20
+
MissingRecordTypePost MissingRecordType = "post"
21
+
MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22
+
)
23
+
24
+
type MissingRecord struct {
25
+
Type MissingRecordType
26
+
Identifier string // DID for profiles, AT-URI for posts/feedgens
27
+
}
28
+
29
+
func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) {
19
30
select {
20
-
case s.missingProfiles <- did:
31
+
case s.missingRecords <- rec:
21
32
case <-ctx.Done():
22
33
}
23
34
}
24
35
25
-
func (s *Server) missingProfileFetcher() {
26
-
for did := range s.missingProfiles {
27
-
if err := s.fetchMissingProfile(context.TODO(), did); err != nil {
28
-
log.Warn("failed to fetch missing profile", "did", did, "error", err)
36
+
// Legacy methods for backward compatibility
37
+
func (s *Server) addMissingProfile(ctx context.Context, did string) {
38
+
s.addMissingRecord(ctx, MissingRecord{
39
+
Type: MissingRecordTypeProfile,
40
+
Identifier: did,
41
+
})
42
+
}
43
+
44
+
func (s *Server) addMissingPost(ctx context.Context, uri string) {
45
+
slog.Info("adding missing post to fetch queue", "uri", uri)
46
+
s.addMissingRecord(ctx, MissingRecord{
47
+
Type: MissingRecordTypePost,
48
+
Identifier: uri,
49
+
})
50
+
}
51
+
52
+
func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) {
53
+
slog.Info("adding missing feed generator to fetch queue", "uri", uri)
54
+
s.addMissingRecord(ctx, MissingRecord{
55
+
Type: MissingRecordTypeFeedGenerator,
56
+
Identifier: uri,
57
+
})
58
+
}
59
+
60
+
func (s *Server) missingRecordFetcher() {
61
+
for rec := range s.missingRecords {
62
+
var err error
63
+
switch rec.Type {
64
+
case MissingRecordTypeProfile:
65
+
err = s.fetchMissingProfile(context.TODO(), rec.Identifier)
66
+
case MissingRecordTypePost:
67
+
err = s.fetchMissingPost(context.TODO(), rec.Identifier)
68
+
case MissingRecordTypeFeedGenerator:
69
+
err = s.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
70
+
default:
71
+
slog.Error("unknown missing record type", "type", rec.Type)
72
+
continue
73
+
}
74
+
75
+
if err != nil {
76
+
slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err)
29
77
}
30
78
}
31
79
}
···
68
116
return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
69
117
}
70
118
71
-
func (s *Server) addMissingPost(ctx context.Context, uri string) {
72
-
slog.Info("adding missing post to fetch queue", "uri", uri)
73
-
select {
74
-
case s.missingPosts <- uri:
75
-
case <-ctx.Done():
76
-
}
77
-
}
78
-
79
-
func (s *Server) missingPostFetcher() {
80
-
for uri := range s.missingPosts {
81
-
if err := s.fetchMissingPost(context.TODO(), uri); err != nil {
82
-
log.Warn("failed to fetch missing post", "uri", uri, "error", err)
83
-
}
84
-
}
85
-
}
86
-
87
119
func (s *Server) fetchMissingPost(ctx context.Context, uri string) error {
88
-
// Parse AT URI: at://did:plc:xxx/app.bsky.feed.post/rkey
89
-
parts := strings.Split(uri, "/")
90
-
if len(parts) < 5 || !strings.HasPrefix(parts[2], "did:") {
120
+
puri, err := syntax.ParseATURI(uri)
121
+
if err != nil {
91
122
return fmt.Errorf("invalid AT URI: %s", uri)
92
123
}
93
124
94
-
did := parts[2]
95
-
collection := parts[3]
96
-
rkey := parts[4]
125
+
did := puri.Authority().String()
126
+
collection := puri.Collection().String()
127
+
rkey := puri.RecordKey().String()
97
128
98
129
repo, err := s.backend.getOrCreateRepo(ctx, did)
99
130
if err != nil {
···
132
163
return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
133
164
}
134
165
135
-
func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) {
136
-
slog.Info("adding missing feed generator to fetch queue", "uri", uri)
137
-
select {
138
-
case s.missingFeedGenerators <- uri:
139
-
case <-ctx.Done():
140
-
}
141
-
}
142
-
143
-
func (s *Server) missingFeedGeneratorFetcher() {
144
-
for uri := range s.missingFeedGenerators {
145
-
if err := s.fetchMissingFeedGenerator(context.TODO(), uri); err != nil {
146
-
log.Warn("failed to fetch missing feed generator", "uri", uri, "error", err)
147
-
}
148
-
}
149
-
}
150
-
151
166
func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
152
-
// Parse AT URI: at://did:plc:xxx/app.bsky.feed.generator/rkey
153
167
puri, err := syntax.ParseATURI(uri)
154
168
if err != nil {
155
169
return fmt.Errorf("invalid AT URI: %s", uri)
+31
-8
xrpc/feed/getFeed.go
+31
-8
xrpc/feed/getFeed.go
···
23
23
// Parse parameters
24
24
feedURI := c.QueryParam("feed")
25
25
if feedURI == "" {
26
-
return c.JSON(http.StatusBadRequest, map[string]interface{}{
26
+
return c.JSON(http.StatusBadRequest, map[string]any{
27
27
"error": "InvalidRequest",
28
28
"message": "feed parameter is required",
29
29
})
···
49
49
rkey := extractRkeyFromURI(feedURI)
50
50
51
51
if did == "" || rkey == "" {
52
-
return c.JSON(http.StatusBadRequest, map[string]interface{}{
52
+
return c.JSON(http.StatusBadRequest, map[string]any{
53
53
"error": "InvalidRequest",
54
54
"message": "invalid feed URI format",
55
55
})
···
65
65
66
66
if feedGen.ID == 0 {
67
67
hydrator.AddMissingFeedGenerator(feedURI)
68
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
68
+
return c.JSON(http.StatusNotFound, map[string]any{
69
69
"error": "NotFound",
70
70
"message": "feed generator not found",
71
71
})
···
75
75
var feedGenRecord bsky.FeedGenerator
76
76
if err := feedGenRecord.UnmarshalCBOR(bytes.NewReader(feedGen.Raw)); err != nil {
77
77
slog.Error("failed to decode feed generator record", "error", err)
78
-
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
78
+
return c.JSON(http.StatusInternalServerError, map[string]any{
79
79
"error": "InternalError",
80
80
"message": "failed to decode feed generator record",
81
81
})
···
85
85
serviceDID, err := syntax.ParseDID(feedGenRecord.Did)
86
86
if err != nil {
87
87
slog.Error("invalid service DID in feed generator", "error", err, "did", feedGenRecord.Did)
88
-
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
88
+
return c.JSON(http.StatusInternalServerError, map[string]any{
89
89
"error": "InternalError",
90
90
"message": "invalid service DID",
91
91
})
···
95
95
serviceIdent, err := dir.LookupDID(ctx, serviceDID)
96
96
if err != nil {
97
97
slog.Error("failed to resolve service DID", "error", err, "did", serviceDID)
98
-
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
98
+
return c.JSON(http.StatusInternalServerError, map[string]any{
99
99
"error": "InternalError",
100
100
"message": "failed to resolve service endpoint",
101
101
})
···
104
104
serviceEndpoint := serviceIdent.GetServiceEndpoint("bsky_fg")
105
105
if serviceEndpoint == "" {
106
106
slog.Error("service has no bsky_fg endpoint", "did", serviceDID)
107
-
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
107
+
return c.JSON(http.StatusInternalServerError, map[string]any{
108
108
"error": "InternalError",
109
109
"message": "service has no endpoint",
110
110
})
111
111
}
112
112
113
113
// Create XRPC client for the feed generator service
114
+
// Pass through headers from the original request so feed generators can
115
+
// customize feeds based on the viewer
116
+
headers := make(map[string]string)
117
+
118
+
// Set User-Agent to identify konbini
119
+
headers["User-Agent"] = "konbini/0.0.1"
120
+
121
+
// Pass through Authorization header if present (for authenticated feed requests)
122
+
if authHeader := c.Request().Header.Get("Authorization"); authHeader != "" {
123
+
headers["Authorization"] = authHeader
124
+
}
125
+
126
+
// Pass through Accept-Language header if present
127
+
if langHeader := c.Request().Header.Get("Accept-Language"); langHeader != "" {
128
+
headers["Accept-Language"] = langHeader
129
+
}
130
+
131
+
// Pass through X-Bsky-Topics header if present
132
+
if topicsHeader := c.Request().Header.Get("X-Bsky-Topics"); topicsHeader != "" {
133
+
headers["X-Bsky-Topics"] = topicsHeader
134
+
}
135
+
114
136
client := &xrpc.Client{
115
-
Host: serviceEndpoint,
137
+
Host: serviceEndpoint,
138
+
Headers: headers,
116
139
}
117
140
118
141
// Call getFeedSkeleton on the service