+2
-1
.gitignore
+2
-1
.gitignore
+27
-5
cmd/aturilist/client/client.go
+27
-5
cmd/aturilist/client/client.go
···
13
13
14
14
// Constants for the XRPC methods
15
15
const (
16
-
MethodListRecords = "app.reddwarf.aturilist.listRecords"
17
-
MethodCountRecords = "app.reddwarf.aturilist.countRecords"
18
-
MethodIndexRecord = "app.reddwarf.aturilist.indexRecord"
19
-
MethodValidateRecord = "app.reddwarf.aturilist.validateRecord"
20
-
DefaultProductionHost = "https://aturilist.reddwarf.app"
16
+
MethodListRecords = "app.reddwarf.aturilist.listRecords"
17
+
MethodCountRecords = "app.reddwarf.aturilist.countRecords"
18
+
MethodIndexRecord = "app.reddwarf.aturilist.indexRecord"
19
+
MethodValidateRecord = "app.reddwarf.aturilist.validateRecord"
20
+
MethodQueryCollectionRkey = "app.reddwarf.aturilist.queryCollectionRkey"
21
+
DefaultProductionHost = "https://aturilist.reddwarf.app"
21
22
)
22
23
23
24
// Client is the API client for the Red Dwarf AtURI List Service.
···
53
54
Repo string `json:"repo"`
54
55
Collection string `json:"collection"`
55
56
Count int `json:"count"`
57
+
}
58
+
59
+
type QueryCollectionRkeyResponse struct {
60
+
Collection string `json:"collection"`
61
+
RKey string `json:"rkey"`
62
+
DIDs []string `json:"dids"`
63
+
Count int `json:"count"`
56
64
}
57
65
58
66
type ErrorResponse struct {
···
138
146
}
139
147
140
148
return true, nil
149
+
}
150
+
151
+
// QueryCollectionRkey returns a list of DIDs that have a specific collection and rkey pair.
152
+
func (c *Client) QueryCollectionRkey(ctx context.Context, collection, rkey string) (*QueryCollectionRkeyResponse, error) {
153
+
params := url.Values{}
154
+
params.Set("collection", collection)
155
+
params.Set("rkey", rkey)
156
+
157
+
var resp QueryCollectionRkeyResponse
158
+
if err := c.doRequest(ctx, http.MethodGet, MethodQueryCollectionRkey, params, nil, &resp); err != nil {
159
+
return nil, err
160
+
}
161
+
162
+
return &resp, nil
141
163
}
142
164
143
165
// --- Internal Helpers ---
+127
-53
cmd/aturilist/main.go
+127
-53
cmd/aturilist/main.go
···
2
2
3
3
import (
4
4
"context"
5
+
"encoding/json"
5
6
"errors"
6
7
"flag"
7
8
"fmt"
···
20
21
"github.com/dgraph-io/badger/v4"
21
22
"github.com/gin-gonic/gin"
22
23
23
-
// Restored your specific imports
24
24
"tangled.org/whey.party/red-dwarf-server/auth"
25
25
"tangled.org/whey.party/red-dwarf-server/microcosm"
26
26
"tangled.org/whey.party/red-dwarf-server/microcosm/slingshot"
···
30
30
db *badger.DB
31
31
logger *slog.Logger
32
32
33
-
// Locks for specific operations if needed, though Badger is thread-safe
34
33
backfillTracker map[string]*sync.WaitGroup
35
34
backfillMutex sync.Mutex
36
35
}
···
73
72
74
73
initURLs(*prod)
75
74
76
-
// 1. Initialize DB
77
75
db, err := badger.Open(badger.DefaultOptions(*dbPath))
78
76
if err != nil {
79
77
logger.Error("Failed to open BadgerDB", "error", err)
···
86
84
logger: logger,
87
85
}
88
86
89
-
// 2. Initialize Auth
90
87
auther, err := auth.NewAuth(
91
88
100_000,
92
89
time.Hour*12,
93
90
5,
94
-
serviceWebDID, //+"#bsky_appview",
91
+
serviceWebDID,
95
92
)
96
93
if err != nil {
97
94
log.Fatalf("Failed to create Auth: %v", err)
98
95
}
99
96
100
-
// 3. Initialize Clients
101
97
ctx := context.Background()
102
98
sl := slingshot.NewSlingshot(SLINGSHOT_URL)
103
99
104
-
// 4. Initialize Jetstream
105
100
config := client.DefaultClientConfig()
106
101
config.WebsocketURL = JETSTREAM_URL
107
102
config.Compress = true
···
115
110
return
116
111
}
117
112
118
-
// Connect with cursor (5 minutes ago)
119
113
cursor := time.Now().Add(-5 * time.Minute).UnixMicro()
120
114
121
115
go func() {
122
116
logger.Info("Connecting to Jetstream...")
123
-
/*
124
-
If you resume a jetstream firehose from a cursor, everything works fine until you catch up to real time.
125
-
At that point, the connection drops. If you connect without a cursor (going straight to realtime), it keeps working.
126
-
*/
127
117
for {
128
118
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
129
119
logger.Error("jetstream connection disconnected", "error", err)
···
131
121
132
122
select {
133
123
case <-ctx.Done():
134
-
return // Context cancelled, exit loop
124
+
return
135
125
default:
136
126
logger.Info("Reconnecting to Jetstream in 5 seconds...", "cursor", cursor)
137
127
time.Sleep(5 * time.Second)
···
139
129
}
140
130
}()
141
131
142
-
// 5. Initialize Router
143
132
router := gin.New()
144
133
router.Use(auther.AuthenticateGinRequestViaJWT)
145
134
···
147
136
148
137
router.GET("/xrpc/app.reddwarf.aturilist.countRecords", srv.handleCountRecords)
149
138
150
-
// heavily rate limited because can be used for spam.
151
139
router.POST("/xrpc/app.reddwarf.aturilist.indexRecord", func(c *gin.Context) {
152
140
srv.handleIndexRecord(c, sl)
153
141
})
154
142
155
143
router.POST("/xrpc/app.reddwarf.aturilist.validateRecord", srv.handleValidateRecord)
156
144
145
+
router.GET("/xrpc/app.reddwarf.aturilist.queryCollectionRkey", srv.handleQueryCollectionRkey)
146
+
157
147
// router.GET("/xrpc/app.reddwarf.aturilist.requestBackfill", )
158
148
159
149
router.Run(":7155")
160
150
}
161
-
162
-
// --- Jetstream Handler ---
163
151
164
152
type JetstreamHandler struct {
165
153
srv *Server
···
168
156
func (h *JetstreamHandler) HandleEvent(ctx context.Context, event *models.Event) error {
169
157
if event != nil {
170
158
if event.Commit != nil {
171
-
// Identify Delete operation
172
159
isDelete := event.Commit.Operation == models.CommitOperationDelete
173
160
174
-
// Process
175
161
h.srv.processRecord(event.Did, event.Commit.Collection, event.Commit.RKey, isDelete)
176
162
177
163
}
178
164
}
179
165
return nil
180
166
}
181
-
182
-
// --- DB Helpers ---
183
167
184
168
func makeKey(repo, collection, rkey string) []byte {
185
169
return []byte(fmt.Sprintf("%s|%s|%s", repo, collection, rkey))
···
193
177
return parts[0], parts[1], parts[2], nil
194
178
}
195
179
196
-
// processRecord handles the DB write/delete.
197
-
// isDelete=true removes the key. isDelete=false sets the key.
180
+
func makeCollectionRkeyKey(collection, rkey string) []byte {
181
+
return []byte(fmt.Sprintf("cr|%s|%s|", collection, rkey))
182
+
}
183
+
184
+
func parseCollectionRkeyKey(key []byte) (collection, rkey string, err error) {
185
+
parts := strings.Split(string(key), "|")
186
+
if len(parts) < 3 || parts[0] != "cr" {
187
+
return "", "", errors.New("invalid collection+rkey key format")
188
+
}
189
+
return parts[1], parts[2], nil
190
+
}
191
+
198
192
func (s *Server) processRecord(repo, collection, rkey string, isDelete bool) {
199
193
key := makeKey(repo, collection, rkey)
194
+
crKey := makeCollectionRkeyKey(collection, rkey)
200
195
201
196
err := s.db.Update(func(txn *badger.Txn) error {
202
197
if isDelete {
203
-
return txn.Delete(key)
198
+
if err := txn.Delete(key); err != nil {
199
+
return err
200
+
}
201
+
return s.removeDidFromCollectionRkeyIndex(txn, crKey, repo)
204
202
}
205
-
// On create/update, store current timestamp.
206
-
// You can store more data (Cid, etc) here if needed later.
207
-
return txn.Set(key, []byte(time.Now().Format(time.RFC3339)))
203
+
if err := txn.Set(key, []byte(time.Now().Format(time.RFC3339))); err != nil {
204
+
return err
205
+
}
206
+
return s.addDidToCollectionRkeyIndex(txn, crKey, repo)
208
207
})
209
208
210
209
if err != nil {
···
212
211
}
213
212
}
214
213
215
-
// --- HTTP Handlers ---
214
+
func (s *Server) addDidToCollectionRkeyIndex(txn *badger.Txn, crKey []byte, did string) error {
215
+
item, err := txn.Get(crKey)
216
+
if err == badger.ErrKeyNotFound {
217
+
var dids []string
218
+
dids = append(dids, did)
219
+
didsJSON, _ := json.Marshal(dids)
220
+
return txn.Set(crKey, didsJSON)
221
+
} else if err != nil {
222
+
return err
223
+
}
224
+
225
+
var dids []string
226
+
err = item.Value(func(val []byte) error {
227
+
return json.Unmarshal(val, &dids)
228
+
})
229
+
if err != nil {
230
+
return err
231
+
}
232
+
233
+
for _, existingDid := range dids {
234
+
if existingDid == did {
235
+
return nil
236
+
}
237
+
}
238
+
239
+
dids = append(dids, did)
240
+
didsJSON, _ := json.Marshal(dids)
241
+
return txn.Set(crKey, didsJSON)
242
+
}
243
+
244
+
func (s *Server) removeDidFromCollectionRkeyIndex(txn *badger.Txn, crKey []byte, did string) error {
245
+
item, err := txn.Get(crKey)
246
+
if err == badger.ErrKeyNotFound {
247
+
return nil
248
+
} else if err != nil {
249
+
return err
250
+
}
251
+
252
+
var dids []string
253
+
err = item.Value(func(val []byte) error {
254
+
return json.Unmarshal(val, &dids)
255
+
})
256
+
if err != nil {
257
+
return err
258
+
}
259
+
260
+
var newDids []string
261
+
for _, existingDid := range dids {
262
+
if existingDid != did {
263
+
newDids = append(newDids, existingDid)
264
+
}
265
+
}
266
+
267
+
if len(newDids) == 0 {
268
+
return txn.Delete(crKey)
269
+
}
270
+
271
+
didsJSON, _ := json.Marshal(newDids)
272
+
return txn.Set(crKey, didsJSON)
273
+
}
216
274
217
275
func (s *Server) handleListRecords(c *gin.Context) {
218
276
repo := c.Query("repo")
219
277
collection := c.Query("collection")
220
278
cursor := c.Query("cursor")
221
-
reverse := c.Query("reverse") == "true" // 1. Check param
279
+
reverse := c.Query("reverse") == "true"
222
280
limit := 50
223
281
224
282
if repo == "" || collection == "" {
···
226
284
return
227
285
}
228
286
229
-
// Base prefix: "repo|collection|"
230
287
prefixStr := fmt.Sprintf("%s|%s|", repo, collection)
231
288
prefix := []byte(prefixStr)
232
289
···
234
291
var lastRkey string
235
292
236
293
err := s.db.View(func(txn *badger.Txn) error {
237
-
// 2. Configure Iterator Options
238
294
opts := badger.DefaultIteratorOptions
239
295
opts.PrefetchValues = false
240
-
opts.Reverse = reverse // Set reverse mode
296
+
opts.Reverse = reverse
241
297
242
298
it := txn.NewIterator(opts)
243
299
defer it.Close()
244
300
245
-
// 3. Determine Start Key
246
301
var startKey []byte
247
302
if cursor != "" {
248
-
// If cursor exists, we seek to it regardless of direction
249
303
startKey = makeKey(repo, collection, cursor)
250
304
} else {
251
305
if reverse {
252
-
// REVERSE START: "repo|collection|" + 0xFF
253
-
// This seeks to the theoretical end of this prefix range
254
306
startKey = append([]byte(prefixStr), 0xFF)
255
307
} else {
256
-
// FORWARD START: "repo|collection|"
257
308
startKey = prefix
258
309
}
259
310
}
260
311
261
-
// 4. Seek and Iterate
262
312
it.Seek(startKey)
263
313
264
-
// Handle Cursor Pagination Skip
265
-
// If we provided a cursor, we likely landed exactly ON that cursor.
266
-
// We want the record *after* (or *before* in reverse) the cursor.
267
314
if cursor != "" && it.Valid() {
268
-
// Badger's Seek moves to key >= seek_key (even in reverse mode logic varies slightly,
269
-
// but practically we check if we landed on the exact cursor).
270
315
if string(it.Item().Key()) == string(startKey) {
271
-
it.Next() // Skip the cursor itself
316
+
it.Next()
272
317
}
273
318
}
274
319
275
-
// Iterate as long as the key still starts with our prefix
276
320
for ; it.ValidForPrefix(prefix); it.Next() {
277
321
if len(aturis) >= limit {
278
322
break
···
298
342
"count": len(aturis),
299
343
}
300
344
301
-
// Only return cursor if we hit the limit, allowing the client to request the next page
302
345
if lastRkey != "" && len(aturis) == limit {
303
346
resp["cursor"] = lastRkey
304
347
}
···
342
385
})
343
386
}
344
387
345
-
// handleIndexRecord now takes the Slingshot client specifically
346
388
func (s *Server) handleIndexRecord(c *gin.Context, sl *microcosm.MicrocosmClient) {
347
-
//authedUserDid := c.GetString("user_did")
348
-
// Support JSON body preferentially, fallback to Query/Form
349
389
var req struct {
350
390
Collection string `json:"collection"`
351
391
Repo string `json:"repo"`
···
363
403
return
364
404
}
365
405
366
-
// Verify existence using Slingshot/Agnostic
367
406
recordResponse, err := agnostic.RepoGetRecord(c.Request.Context(), sl, "", req.Collection, req.Repo, req.RKey)
368
407
if err != nil {
369
-
// Does not exist remotely -> Delete locally
370
408
s.processRecord(req.Repo, req.Collection, req.RKey, true)
371
409
372
-
// You might want to return 200 even if deleted, to confirm "indexing done"
373
410
c.Status(200)
374
411
return
375
412
}
376
413
377
-
// Exists remotely -> Parse and Insert locally
378
414
uri := recordResponse.Uri
379
415
aturi, err := syntax.ParseATURI(uri)
380
416
if err != nil {
···
422
458
c.Status(404)
423
459
}
424
460
}
461
+
462
+
func (s *Server) handleQueryCollectionRkey(c *gin.Context) {
463
+
collection := c.Query("collection")
464
+
rkey := c.Query("rkey")
465
+
466
+
if collection == "" || rkey == "" {
467
+
c.JSON(400, gin.H{"error": "collection and rkey required"})
468
+
return
469
+
}
470
+
471
+
crKey := makeCollectionRkeyKey(collection, rkey)
472
+
var dids []string
473
+
474
+
err := s.db.View(func(txn *badger.Txn) error {
475
+
item, err := txn.Get(crKey)
476
+
if err == badger.ErrKeyNotFound {
477
+
return nil
478
+
} else if err != nil {
479
+
return err
480
+
}
481
+
482
+
return item.Value(func(val []byte) error {
483
+
return json.Unmarshal(val, &dids)
484
+
})
485
+
})
486
+
487
+
if err != nil {
488
+
c.JSON(500, gin.H{"error": err.Error()})
489
+
return
490
+
}
491
+
492
+
c.JSON(200, gin.H{
493
+
"collection": collection,
494
+
"rkey": rkey,
495
+
"dids": dids,
496
+
"count": len(dids),
497
+
})
498
+
}
+293
-10
cmd/jetrelay/main.go
+293
-10
cmd/jetrelay/main.go
···
1
1
package main
2
2
3
3
import (
4
-
"flag"
4
+
"context"
5
+
"encoding/json"
5
6
"fmt"
7
+
"io"
8
+
"log"
9
+
"net/http"
10
+
"sort"
11
+
"sync"
12
+
"time"
13
+
14
+
"github.com/gorilla/websocket"
15
+
"github.com/klauspost/compress/zstd"
6
16
)
7
17
8
-
type multiFlag []string
18
+
const (
19
+
ServerPort = ":3878"
20
+
DictionaryURL = "https://raw.githubusercontent.com/bluesky-social/jetstream/main/pkg/models/zstd_dictionary"
21
+
BufferSize = 100000
22
+
ReconnectDelay = 5 * time.Second
23
+
)
9
24
10
-
func (m *multiFlag) String() string {
11
-
return fmt.Sprint(*m)
25
+
var SourceJetstreams = []string{
26
+
"ws://localhost:6008/subscribe", // local jetstream
27
+
"ws://localhost:3877/subscribe", // local backstream
28
+
}
29
+
30
+
type Event struct {
31
+
Kind string `json:"kind"`
32
+
TimeUS int64 `json:"time_us"`
33
+
Commit json.RawMessage `json:"commit,omitempty"`
34
+
}
35
+
36
+
type BufferedEvent struct {
37
+
RelayTimeUS int64
38
+
RawJSON []byte
12
39
}
13
40
14
-
func (m *multiFlag) Set(value string) error {
15
-
*m = append(*m, value)
41
+
type History struct {
42
+
events []BufferedEvent
43
+
mu sync.RWMutex
44
+
}
45
+
46
+
func (h *History) Add(jsonBytes []byte, relayTime int64) {
47
+
h.mu.Lock()
48
+
defer h.mu.Unlock()
49
+
50
+
h.events = append(h.events, BufferedEvent{
51
+
RelayTimeUS: relayTime,
52
+
RawJSON: jsonBytes,
53
+
})
54
+
55
+
if len(h.events) > BufferSize {
56
+
h.events = h.events[len(h.events)-BufferSize:]
57
+
}
58
+
}
59
+
60
+
func (h *History) GetSince(cursor int64) []BufferedEvent {
61
+
h.mu.RLock()
62
+
defer h.mu.RUnlock()
63
+
64
+
idx := sort.Search(len(h.events), func(i int) bool {
65
+
return h.events[i].RelayTimeUS > cursor
66
+
})
67
+
68
+
if idx < len(h.events) {
69
+
result := make([]BufferedEvent, len(h.events)-idx)
70
+
copy(result, h.events[idx:])
71
+
return result
72
+
}
16
73
return nil
17
74
}
18
75
76
+
var (
77
+
history = &History{events: make([]BufferedEvent, 0, BufferSize)}
78
+
zstdDict []byte
79
+
hub *Hub
80
+
upgrader = websocket.Upgrader{
81
+
CheckOrigin: func(r *http.Request) bool { return true },
82
+
}
83
+
)
84
+
19
85
func main() {
20
-
var js multiFlag
21
-
flag.Var(&js, "j", "jetstream instances 'write multiple to input more than one'")
86
+
log.Println("Initializing Relay...")
87
+
88
+
var err error
89
+
zstdDict, err = downloadDictionary()
90
+
if err != nil {
91
+
log.Fatalf("Failed to load dictionary: %v", err)
92
+
}
93
+
94
+
hub = newHub()
95
+
go hub.run()
96
+
97
+
ctx := context.Background()
98
+
for i, url := range SourceJetstreams {
99
+
go runUpstreamConsumer(ctx, i, url)
100
+
}
101
+
102
+
http.HandleFunc("/subscribe", serveWs)
103
+
log.Printf("๐ฅ Relay Active on %s", ServerPort)
104
+
if err := http.ListenAndServe(ServerPort, nil); err != nil {
105
+
log.Fatal(err)
106
+
}
107
+
}
108
+
109
+
func runUpstreamConsumer(ctx context.Context, id int, baseURL string) {
110
+
var lastSeenCursor int64 = 0
111
+
112
+
for {
113
+
connectURL := baseURL
114
+
if lastSeenCursor > 0 {
115
+
connectURL = fmt.Sprintf("%s?cursor=%d", baseURL, lastSeenCursor)
116
+
log.Printf("[Input %d] Reconnecting with cursor: %d", id, lastSeenCursor)
117
+
} else {
118
+
log.Printf("[Input %d] Connecting fresh...", id)
119
+
}
120
+
121
+
conn, _, err := websocket.DefaultDialer.Dial(connectURL, nil)
122
+
if err != nil {
123
+
log.Printf("[Input %d] Connect failed: %v. Retrying...", id, err)
124
+
time.Sleep(ReconnectDelay)
125
+
continue
126
+
}
127
+
128
+
log.Printf("[Input %d] Connected.", id)
129
+
130
+
for {
131
+
_, msg, err := conn.ReadMessage()
132
+
if err != nil {
133
+
log.Printf("[Input %d] Read error: %v", id, err)
134
+
break
135
+
}
136
+
137
+
var genericEvent map[string]interface{}
138
+
if err := json.Unmarshal(msg, &genericEvent); err != nil {
139
+
continue
140
+
}
141
+
142
+
if t, ok := genericEvent["time_us"].(float64); ok {
143
+
lastSeenCursor = int64(t)
144
+
}
145
+
146
+
nowUS := time.Now().UnixMicro()
147
+
genericEvent["time_us"] = nowUS
148
+
149
+
finalBytes, err := json.Marshal(genericEvent)
150
+
if err != nil {
151
+
continue
152
+
}
153
+
154
+
history.Add(finalBytes, nowUS)
155
+
156
+
hub.broadcast <- BufferedEvent{RelayTimeUS: nowUS, RawJSON: finalBytes}
157
+
}
158
+
conn.Close()
159
+
time.Sleep(ReconnectDelay)
160
+
}
161
+
}
162
+
163
+
func serveWs(w http.ResponseWriter, r *http.Request) {
164
+
conn, err := upgrader.Upgrade(w, r, nil)
165
+
if err != nil {
166
+
return
167
+
}
168
+
169
+
compress := r.URL.Query().Get("compress") == "true"
170
+
171
+
var clientCursor int64 = 0
172
+
cursorStr := r.URL.Query().Get("cursor")
173
+
if cursorStr != "" {
174
+
fmt.Sscanf(cursorStr, "%d", &clientCursor)
175
+
}
176
+
177
+
client := &Client{
178
+
hub: hub,
179
+
conn: conn,
180
+
send: make(chan BufferedEvent, 2048),
181
+
compress: compress,
182
+
lastSentUS: 0,
183
+
}
184
+
185
+
if compress {
186
+
enc, _ := zstd.NewWriter(nil, zstd.WithEncoderDict(zstdDict))
187
+
client.encoder = enc
188
+
}
189
+
190
+
client.hub.register <- client
191
+
192
+
go client.writePump()
193
+
194
+
if clientCursor > 0 {
195
+
log.Printf("Client requested replay from %d", clientCursor)
196
+
missedEvents := history.GetSince(clientCursor)
197
+
for _, evt := range missedEvents {
198
+
client.send <- evt
199
+
}
200
+
}
201
+
202
+
go client.readPump()
203
+
}
204
+
205
+
type Client struct {
206
+
hub *Hub
207
+
conn *websocket.Conn
208
+
send chan BufferedEvent
209
+
compress bool
210
+
encoder *zstd.Encoder
211
+
lastSentUS int64
212
+
}
213
+
214
+
type Hub struct {
215
+
clients map[*Client]bool
216
+
broadcast chan BufferedEvent
217
+
register chan *Client
218
+
unregister chan *Client
219
+
mu sync.RWMutex
220
+
}
221
+
222
+
func newHub() *Hub {
223
+
return &Hub{
224
+
clients: make(map[*Client]bool),
225
+
broadcast: make(chan BufferedEvent, 10000),
226
+
register: make(chan *Client),
227
+
unregister: make(chan *Client),
228
+
}
229
+
}
230
+
231
+
func (h *Hub) run() {
232
+
for {
233
+
select {
234
+
case client := <-h.register:
235
+
h.mu.Lock()
236
+
h.clients[client] = true
237
+
h.mu.Unlock()
238
+
239
+
case client := <-h.unregister:
240
+
h.mu.Lock()
241
+
if _, ok := h.clients[client]; ok {
242
+
delete(h.clients, client)
243
+
close(client.send)
244
+
if client.encoder != nil {
245
+
client.encoder.Close()
246
+
}
247
+
}
248
+
h.mu.Unlock()
249
+
250
+
case msg := <-h.broadcast:
251
+
h.mu.RLock()
252
+
for client := range h.clients {
253
+
select {
254
+
case client.send <- msg:
255
+
default:
256
+
go func(c *Client) {
257
+
h.unregister <- c
258
+
c.conn.Close()
259
+
}(client)
260
+
}
261
+
}
262
+
h.mu.RUnlock()
263
+
}
264
+
}
265
+
}
266
+
267
+
func (c *Client) writePump() {
268
+
defer c.conn.Close()
269
+
270
+
for msg := range c.send {
271
+
if msg.RelayTimeUS <= c.lastSentUS {
272
+
continue
273
+
}
274
+
275
+
c.lastSentUS = msg.RelayTimeUS
276
+
277
+
if c.compress {
278
+
compressed := c.encoder.EncodeAll(msg.RawJSON, nil)
279
+
if err := c.conn.WriteMessage(websocket.BinaryMessage, compressed); err != nil {
280
+
return
281
+
}
282
+
} else {
283
+
if err := c.conn.WriteMessage(websocket.TextMessage, msg.RawJSON); err != nil {
284
+
return
285
+
}
286
+
}
287
+
}
288
+
}
22
289
23
-
flag.Parse()
290
+
func (c *Client) readPump() {
291
+
defer func() {
292
+
c.hub.unregister <- c
293
+
c.conn.Close()
294
+
}()
295
+
for {
296
+
if _, _, err := c.conn.ReadMessage(); err != nil {
297
+
break
298
+
}
299
+
}
300
+
}
24
301
25
-
fmt.Println(js) // prints: [hi hello what]
302
+
func downloadDictionary() ([]byte, error) {
303
+
resp, err := http.Get(DictionaryURL)
304
+
if err != nil {
305
+
return nil, err
306
+
}
307
+
defer resp.Body.Close()
308
+
return io.ReadAll(resp.Body)
26
309
}
+9
-7
shims/lex/app/bsky/feed/defs/embed.go
+9
-7
shims/lex/app/bsky/feed/defs/embed.go
···
163
163
if feedPost.Embed.EmbedRecordWithMedia.Media.EmbedVideo != nil {
164
164
videocdn := "https://video.bsky.app" // todo move this
165
165
embedVideo := EmbedVideoViewExtractor(ctx, aturi, feedPost.Embed.EmbedVideo, sl, cs, imgcdn, videocdn, viewer)
166
-
embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{
167
-
// EmbedImages_View *EmbedImages_View
168
-
// EmbedVideo_View *EmbedVideo_View
169
-
EmbedVideo_View: embedVideo.EmbedVideo_View,
170
-
// EmbedVideo_View: &appbsky.EmbedVideo_View{
166
+
if embedVideo != nil {
167
+
embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{
168
+
// EmbedImages_View *EmbedImages_View
169
+
// EmbedVideo_View *EmbedVideo_View
170
+
EmbedVideo_View: embedVideo.EmbedVideo_View,
171
+
// EmbedVideo_View: &appbsky.EmbedVideo_View{
171
172
172
-
// },
173
-
// EmbedExternal_View *EmbedExternal_View
173
+
// },
174
+
// EmbedExternal_View *EmbedExternal_View
175
+
}
174
176
}
175
177
// // video extractor
176
178
// embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{