bluesky appview implementation using microcosm and other services server.reddwarf.app
appview bluesky reddwarf microcosm

Compare changes

Choose any two refs to compare.

Changed files
+458 -76
cmd
aturilist
jetrelay
shims
lex
app
bsky
feed
defs
+2 -1
.gitignore
··· 1 - cmd/aturilist/badger_data 1 + cmd/aturilist/badger_data 2 + cmd/backstream/temp
+27 -5
cmd/aturilist/client/client.go
··· 13 13 14 14 // Constants for the XRPC methods 15 15 const ( 16 - MethodListRecords = "app.reddwarf.aturilist.listRecords" 17 - MethodCountRecords = "app.reddwarf.aturilist.countRecords" 18 - MethodIndexRecord = "app.reddwarf.aturilist.indexRecord" 19 - MethodValidateRecord = "app.reddwarf.aturilist.validateRecord" 20 - DefaultProductionHost = "https://aturilist.reddwarf.app" 16 + MethodListRecords = "app.reddwarf.aturilist.listRecords" 17 + MethodCountRecords = "app.reddwarf.aturilist.countRecords" 18 + MethodIndexRecord = "app.reddwarf.aturilist.indexRecord" 19 + MethodValidateRecord = "app.reddwarf.aturilist.validateRecord" 20 + MethodQueryCollectionRkey = "app.reddwarf.aturilist.queryCollectionRkey" 21 + DefaultProductionHost = "https://aturilist.reddwarf.app" 21 22 ) 22 23 23 24 // Client is the API client for the Red Dwarf AtURI List Service. ··· 53 54 Repo string `json:"repo"` 54 55 Collection string `json:"collection"` 55 56 Count int `json:"count"` 57 + } 58 + 59 + type QueryCollectionRkeyResponse struct { 60 + Collection string `json:"collection"` 61 + RKey string `json:"rkey"` 62 + DIDs []string `json:"dids"` 63 + Count int `json:"count"` 56 64 } 57 65 58 66 type ErrorResponse struct { ··· 138 146 } 139 147 140 148 return true, nil 149 + } 150 + 151 + // QueryCollectionRkey returns a list of DIDs that have a specific collection and rkey pair. 152 + func (c *Client) QueryCollectionRkey(ctx context.Context, collection, rkey string) (*QueryCollectionRkeyResponse, error) { 153 + params := url.Values{} 154 + params.Set("collection", collection) 155 + params.Set("rkey", rkey) 156 + 157 + var resp QueryCollectionRkeyResponse 158 + if err := c.doRequest(ctx, http.MethodGet, MethodQueryCollectionRkey, params, nil, &resp); err != nil { 159 + return nil, err 160 + } 161 + 162 + return &resp, nil 141 163 } 142 164 143 165 // --- Internal Helpers ---
+127 -53
cmd/aturilist/main.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/json" 5 6 "errors" 6 7 "flag" 7 8 "fmt" ··· 20 21 "github.com/dgraph-io/badger/v4" 21 22 "github.com/gin-gonic/gin" 22 23 23 - // Restored your specific imports 24 24 "tangled.org/whey.party/red-dwarf-server/auth" 25 25 "tangled.org/whey.party/red-dwarf-server/microcosm" 26 26 "tangled.org/whey.party/red-dwarf-server/microcosm/slingshot" ··· 30 30 db *badger.DB 31 31 logger *slog.Logger 32 32 33 - // Locks for specific operations if needed, though Badger is thread-safe 34 33 backfillTracker map[string]*sync.WaitGroup 35 34 backfillMutex sync.Mutex 36 35 } ··· 73 72 74 73 initURLs(*prod) 75 74 76 - // 1. Initialize DB 77 75 db, err := badger.Open(badger.DefaultOptions(*dbPath)) 78 76 if err != nil { 79 77 logger.Error("Failed to open BadgerDB", "error", err) ··· 86 84 logger: logger, 87 85 } 88 86 89 - // 2. Initialize Auth 90 87 auther, err := auth.NewAuth( 91 88 100_000, 92 89 time.Hour*12, 93 90 5, 94 - serviceWebDID, //+"#bsky_appview", 91 + serviceWebDID, 95 92 ) 96 93 if err != nil { 97 94 log.Fatalf("Failed to create Auth: %v", err) 98 95 } 99 96 100 - // 3. Initialize Clients 101 97 ctx := context.Background() 102 98 sl := slingshot.NewSlingshot(SLINGSHOT_URL) 103 99 104 - // 4. Initialize Jetstream 105 100 config := client.DefaultClientConfig() 106 101 config.WebsocketURL = JETSTREAM_URL 107 102 config.Compress = true ··· 115 110 return 116 111 } 117 112 118 - // Connect with cursor (5 minutes ago) 119 113 cursor := time.Now().Add(-5 * time.Minute).UnixMicro() 120 114 121 115 go func() { 122 116 logger.Info("Connecting to Jetstream...") 123 - /* 124 - If you resume a jetstream firehose from a cursor, everything works fine until you catch up to real time. 125 - At that point, the connection drops. If you connect without a cursor (going straight to realtime), it keeps working. 126 - */ 127 117 for { 128 118 if err := c.ConnectAndRead(ctx, &cursor); err != nil { 129 119 logger.Error("jetstream connection disconnected", "error", err) ··· 131 121 132 122 select { 133 123 case <-ctx.Done(): 134 - return // Context cancelled, exit loop 124 + return 135 125 default: 136 126 logger.Info("Reconnecting to Jetstream in 5 seconds...", "cursor", cursor) 137 127 time.Sleep(5 * time.Second) ··· 139 129 } 140 130 }() 141 131 142 - // 5. Initialize Router 143 132 router := gin.New() 144 133 router.Use(auther.AuthenticateGinRequestViaJWT) 145 134 ··· 147 136 148 137 router.GET("/xrpc/app.reddwarf.aturilist.countRecords", srv.handleCountRecords) 149 138 150 - // heavily rate limited because can be used for spam. 151 139 router.POST("/xrpc/app.reddwarf.aturilist.indexRecord", func(c *gin.Context) { 152 140 srv.handleIndexRecord(c, sl) 153 141 }) 154 142 155 143 router.POST("/xrpc/app.reddwarf.aturilist.validateRecord", srv.handleValidateRecord) 156 144 145 + router.GET("/xrpc/app.reddwarf.aturilist.queryCollectionRkey", srv.handleQueryCollectionRkey) 146 + 157 147 // router.GET("/xrpc/app.reddwarf.aturilist.requestBackfill", ) 158 148 159 149 router.Run(":7155") 160 150 } 161 - 162 - // --- Jetstream Handler --- 163 151 164 152 type JetstreamHandler struct { 165 153 srv *Server ··· 168 156 func (h *JetstreamHandler) HandleEvent(ctx context.Context, event *models.Event) error { 169 157 if event != nil { 170 158 if event.Commit != nil { 171 - // Identify Delete operation 172 159 isDelete := event.Commit.Operation == models.CommitOperationDelete 173 160 174 - // Process 175 161 h.srv.processRecord(event.Did, event.Commit.Collection, event.Commit.RKey, isDelete) 176 162 177 163 } 178 164 } 179 165 return nil 180 166 } 181 - 182 - // --- DB Helpers --- 183 167 184 168 func makeKey(repo, collection, rkey string) []byte { 185 169 return []byte(fmt.Sprintf("%s|%s|%s", repo, collection, rkey)) ··· 193 177 return parts[0], parts[1], parts[2], nil 194 178 } 195 179 196 - // processRecord handles the DB write/delete. 197 - // isDelete=true removes the key. isDelete=false sets the key. 180 + func makeCollectionRkeyKey(collection, rkey string) []byte { 181 + return []byte(fmt.Sprintf("cr|%s|%s|", collection, rkey)) 182 + } 183 + 184 + func parseCollectionRkeyKey(key []byte) (collection, rkey string, err error) { 185 + parts := strings.Split(string(key), "|") 186 + if len(parts) < 3 || parts[0] != "cr" { 187 + return "", "", errors.New("invalid collection+rkey key format") 188 + } 189 + return parts[1], parts[2], nil 190 + } 191 + 198 192 func (s *Server) processRecord(repo, collection, rkey string, isDelete bool) { 199 193 key := makeKey(repo, collection, rkey) 194 + crKey := makeCollectionRkeyKey(collection, rkey) 200 195 201 196 err := s.db.Update(func(txn *badger.Txn) error { 202 197 if isDelete { 203 - return txn.Delete(key) 198 + if err := txn.Delete(key); err != nil { 199 + return err 200 + } 201 + return s.removeDidFromCollectionRkeyIndex(txn, crKey, repo) 204 202 } 205 - // On create/update, store current timestamp. 206 - // You can store more data (Cid, etc) here if needed later. 207 - return txn.Set(key, []byte(time.Now().Format(time.RFC3339))) 203 + if err := txn.Set(key, []byte(time.Now().Format(time.RFC3339))); err != nil { 204 + return err 205 + } 206 + return s.addDidToCollectionRkeyIndex(txn, crKey, repo) 208 207 }) 209 208 210 209 if err != nil { ··· 212 211 } 213 212 } 214 213 215 - // --- HTTP Handlers --- 214 + func (s *Server) addDidToCollectionRkeyIndex(txn *badger.Txn, crKey []byte, did string) error { 215 + item, err := txn.Get(crKey) 216 + if err == badger.ErrKeyNotFound { 217 + var dids []string 218 + dids = append(dids, did) 219 + didsJSON, _ := json.Marshal(dids) 220 + return txn.Set(crKey, didsJSON) 221 + } else if err != nil { 222 + return err 223 + } 224 + 225 + var dids []string 226 + err = item.Value(func(val []byte) error { 227 + return json.Unmarshal(val, &dids) 228 + }) 229 + if err != nil { 230 + return err 231 + } 232 + 233 + for _, existingDid := range dids { 234 + if existingDid == did { 235 + return nil 236 + } 237 + } 238 + 239 + dids = append(dids, did) 240 + didsJSON, _ := json.Marshal(dids) 241 + return txn.Set(crKey, didsJSON) 242 + } 243 + 244 + func (s *Server) removeDidFromCollectionRkeyIndex(txn *badger.Txn, crKey []byte, did string) error { 245 + item, err := txn.Get(crKey) 246 + if err == badger.ErrKeyNotFound { 247 + return nil 248 + } else if err != nil { 249 + return err 250 + } 251 + 252 + var dids []string 253 + err = item.Value(func(val []byte) error { 254 + return json.Unmarshal(val, &dids) 255 + }) 256 + if err != nil { 257 + return err 258 + } 259 + 260 + var newDids []string 261 + for _, existingDid := range dids { 262 + if existingDid != did { 263 + newDids = append(newDids, existingDid) 264 + } 265 + } 266 + 267 + if len(newDids) == 0 { 268 + return txn.Delete(crKey) 269 + } 270 + 271 + didsJSON, _ := json.Marshal(newDids) 272 + return txn.Set(crKey, didsJSON) 273 + } 216 274 217 275 func (s *Server) handleListRecords(c *gin.Context) { 218 276 repo := c.Query("repo") 219 277 collection := c.Query("collection") 220 278 cursor := c.Query("cursor") 221 - reverse := c.Query("reverse") == "true" // 1. Check param 279 + reverse := c.Query("reverse") == "true" 222 280 limit := 50 223 281 224 282 if repo == "" || collection == "" { ··· 226 284 return 227 285 } 228 286 229 - // Base prefix: "repo|collection|" 230 287 prefixStr := fmt.Sprintf("%s|%s|", repo, collection) 231 288 prefix := []byte(prefixStr) 232 289 ··· 234 291 var lastRkey string 235 292 236 293 err := s.db.View(func(txn *badger.Txn) error { 237 - // 2. Configure Iterator Options 238 294 opts := badger.DefaultIteratorOptions 239 295 opts.PrefetchValues = false 240 - opts.Reverse = reverse // Set reverse mode 296 + opts.Reverse = reverse 241 297 242 298 it := txn.NewIterator(opts) 243 299 defer it.Close() 244 300 245 - // 3. Determine Start Key 246 301 var startKey []byte 247 302 if cursor != "" { 248 - // If cursor exists, we seek to it regardless of direction 249 303 startKey = makeKey(repo, collection, cursor) 250 304 } else { 251 305 if reverse { 252 - // REVERSE START: "repo|collection|" + 0xFF 253 - // This seeks to the theoretical end of this prefix range 254 306 startKey = append([]byte(prefixStr), 0xFF) 255 307 } else { 256 - // FORWARD START: "repo|collection|" 257 308 startKey = prefix 258 309 } 259 310 } 260 311 261 - // 4. Seek and Iterate 262 312 it.Seek(startKey) 263 313 264 - // Handle Cursor Pagination Skip 265 - // If we provided a cursor, we likely landed exactly ON that cursor. 266 - // We want the record *after* (or *before* in reverse) the cursor. 267 314 if cursor != "" && it.Valid() { 268 - // Badger's Seek moves to key >= seek_key (even in reverse mode logic varies slightly, 269 - // but practically we check if we landed on the exact cursor). 270 315 if string(it.Item().Key()) == string(startKey) { 271 - it.Next() // Skip the cursor itself 316 + it.Next() 272 317 } 273 318 } 274 319 275 - // Iterate as long as the key still starts with our prefix 276 320 for ; it.ValidForPrefix(prefix); it.Next() { 277 321 if len(aturis) >= limit { 278 322 break ··· 298 342 "count": len(aturis), 299 343 } 300 344 301 - // Only return cursor if we hit the limit, allowing the client to request the next page 302 345 if lastRkey != "" && len(aturis) == limit { 303 346 resp["cursor"] = lastRkey 304 347 } ··· 342 385 }) 343 386 } 344 387 345 - // handleIndexRecord now takes the Slingshot client specifically 346 388 func (s *Server) handleIndexRecord(c *gin.Context, sl *microcosm.MicrocosmClient) { 347 - //authedUserDid := c.GetString("user_did") 348 - // Support JSON body preferentially, fallback to Query/Form 349 389 var req struct { 350 390 Collection string `json:"collection"` 351 391 Repo string `json:"repo"` ··· 363 403 return 364 404 } 365 405 366 - // Verify existence using Slingshot/Agnostic 367 406 recordResponse, err := agnostic.RepoGetRecord(c.Request.Context(), sl, "", req.Collection, req.Repo, req.RKey) 368 407 if err != nil { 369 - // Does not exist remotely -> Delete locally 370 408 s.processRecord(req.Repo, req.Collection, req.RKey, true) 371 409 372 - // You might want to return 200 even if deleted, to confirm "indexing done" 373 410 c.Status(200) 374 411 return 375 412 } 376 413 377 - // Exists remotely -> Parse and Insert locally 378 414 uri := recordResponse.Uri 379 415 aturi, err := syntax.ParseATURI(uri) 380 416 if err != nil { ··· 422 458 c.Status(404) 423 459 } 424 460 } 461 + 462 + func (s *Server) handleQueryCollectionRkey(c *gin.Context) { 463 + collection := c.Query("collection") 464 + rkey := c.Query("rkey") 465 + 466 + if collection == "" || rkey == "" { 467 + c.JSON(400, gin.H{"error": "collection and rkey required"}) 468 + return 469 + } 470 + 471 + crKey := makeCollectionRkeyKey(collection, rkey) 472 + var dids []string 473 + 474 + err := s.db.View(func(txn *badger.Txn) error { 475 + item, err := txn.Get(crKey) 476 + if err == badger.ErrKeyNotFound { 477 + return nil 478 + } else if err != nil { 479 + return err 480 + } 481 + 482 + return item.Value(func(val []byte) error { 483 + return json.Unmarshal(val, &dids) 484 + }) 485 + }) 486 + 487 + if err != nil { 488 + c.JSON(500, gin.H{"error": err.Error()}) 489 + return 490 + } 491 + 492 + c.JSON(200, gin.H{ 493 + "collection": collection, 494 + "rkey": rkey, 495 + "dids": dids, 496 + "count": len(dids), 497 + }) 498 + }
+293 -10
cmd/jetrelay/main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "flag" 4 + "context" 5 + "encoding/json" 5 6 "fmt" 7 + "io" 8 + "log" 9 + "net/http" 10 + "sort" 11 + "sync" 12 + "time" 13 + 14 + "github.com/gorilla/websocket" 15 + "github.com/klauspost/compress/zstd" 6 16 ) 7 17 8 - type multiFlag []string 18 + const ( 19 + ServerPort = ":3878" 20 + DictionaryURL = "https://raw.githubusercontent.com/bluesky-social/jetstream/main/pkg/models/zstd_dictionary" 21 + BufferSize = 100000 22 + ReconnectDelay = 5 * time.Second 23 + ) 9 24 10 - func (m *multiFlag) String() string { 11 - return fmt.Sprint(*m) 25 + var SourceJetstreams = []string{ 26 + "ws://localhost:6008/subscribe", // local jetstream 27 + "ws://localhost:3877/subscribe", // local backstream 28 + } 29 + 30 + type Event struct { 31 + Kind string `json:"kind"` 32 + TimeUS int64 `json:"time_us"` 33 + Commit json.RawMessage `json:"commit,omitempty"` 34 + } 35 + 36 + type BufferedEvent struct { 37 + RelayTimeUS int64 38 + RawJSON []byte 12 39 } 13 40 14 - func (m *multiFlag) Set(value string) error { 15 - *m = append(*m, value) 41 + type History struct { 42 + events []BufferedEvent 43 + mu sync.RWMutex 44 + } 45 + 46 + func (h *History) Add(jsonBytes []byte, relayTime int64) { 47 + h.mu.Lock() 48 + defer h.mu.Unlock() 49 + 50 + h.events = append(h.events, BufferedEvent{ 51 + RelayTimeUS: relayTime, 52 + RawJSON: jsonBytes, 53 + }) 54 + 55 + if len(h.events) > BufferSize { 56 + h.events = h.events[len(h.events)-BufferSize:] 57 + } 58 + } 59 + 60 + func (h *History) GetSince(cursor int64) []BufferedEvent { 61 + h.mu.RLock() 62 + defer h.mu.RUnlock() 63 + 64 + idx := sort.Search(len(h.events), func(i int) bool { 65 + return h.events[i].RelayTimeUS > cursor 66 + }) 67 + 68 + if idx < len(h.events) { 69 + result := make([]BufferedEvent, len(h.events)-idx) 70 + copy(result, h.events[idx:]) 71 + return result 72 + } 16 73 return nil 17 74 } 18 75 76 + var ( 77 + history = &History{events: make([]BufferedEvent, 0, BufferSize)} 78 + zstdDict []byte 79 + hub *Hub 80 + upgrader = websocket.Upgrader{ 81 + CheckOrigin: func(r *http.Request) bool { return true }, 82 + } 83 + ) 84 + 19 85 func main() { 20 - var js multiFlag 21 - flag.Var(&js, "j", "jetstream instances 'write multiple to input more than one'") 86 + log.Println("Initializing Relay...") 87 + 88 + var err error 89 + zstdDict, err = downloadDictionary() 90 + if err != nil { 91 + log.Fatalf("Failed to load dictionary: %v", err) 92 + } 93 + 94 + hub = newHub() 95 + go hub.run() 96 + 97 + ctx := context.Background() 98 + for i, url := range SourceJetstreams { 99 + go runUpstreamConsumer(ctx, i, url) 100 + } 101 + 102 + http.HandleFunc("/subscribe", serveWs) 103 + log.Printf("๐Ÿ”ฅ Relay Active on %s", ServerPort) 104 + if err := http.ListenAndServe(ServerPort, nil); err != nil { 105 + log.Fatal(err) 106 + } 107 + } 108 + 109 + func runUpstreamConsumer(ctx context.Context, id int, baseURL string) { 110 + var lastSeenCursor int64 = 0 111 + 112 + for { 113 + connectURL := baseURL 114 + if lastSeenCursor > 0 { 115 + connectURL = fmt.Sprintf("%s?cursor=%d", baseURL, lastSeenCursor) 116 + log.Printf("[Input %d] Reconnecting with cursor: %d", id, lastSeenCursor) 117 + } else { 118 + log.Printf("[Input %d] Connecting fresh...", id) 119 + } 120 + 121 + conn, _, err := websocket.DefaultDialer.Dial(connectURL, nil) 122 + if err != nil { 123 + log.Printf("[Input %d] Connect failed: %v. Retrying...", id, err) 124 + time.Sleep(ReconnectDelay) 125 + continue 126 + } 127 + 128 + log.Printf("[Input %d] Connected.", id) 129 + 130 + for { 131 + _, msg, err := conn.ReadMessage() 132 + if err != nil { 133 + log.Printf("[Input %d] Read error: %v", id, err) 134 + break 135 + } 136 + 137 + var genericEvent map[string]interface{} 138 + if err := json.Unmarshal(msg, &genericEvent); err != nil { 139 + continue 140 + } 141 + 142 + if t, ok := genericEvent["time_us"].(float64); ok { 143 + lastSeenCursor = int64(t) 144 + } 145 + 146 + nowUS := time.Now().UnixMicro() 147 + genericEvent["time_us"] = nowUS 148 + 149 + finalBytes, err := json.Marshal(genericEvent) 150 + if err != nil { 151 + continue 152 + } 153 + 154 + history.Add(finalBytes, nowUS) 155 + 156 + hub.broadcast <- BufferedEvent{RelayTimeUS: nowUS, RawJSON: finalBytes} 157 + } 158 + conn.Close() 159 + time.Sleep(ReconnectDelay) 160 + } 161 + } 162 + 163 + func serveWs(w http.ResponseWriter, r *http.Request) { 164 + conn, err := upgrader.Upgrade(w, r, nil) 165 + if err != nil { 166 + return 167 + } 168 + 169 + compress := r.URL.Query().Get("compress") == "true" 170 + 171 + var clientCursor int64 = 0 172 + cursorStr := r.URL.Query().Get("cursor") 173 + if cursorStr != "" { 174 + fmt.Sscanf(cursorStr, "%d", &clientCursor) 175 + } 176 + 177 + client := &Client{ 178 + hub: hub, 179 + conn: conn, 180 + send: make(chan BufferedEvent, 2048), 181 + compress: compress, 182 + lastSentUS: 0, 183 + } 184 + 185 + if compress { 186 + enc, _ := zstd.NewWriter(nil, zstd.WithEncoderDict(zstdDict)) 187 + client.encoder = enc 188 + } 189 + 190 + client.hub.register <- client 191 + 192 + go client.writePump() 193 + 194 + if clientCursor > 0 { 195 + log.Printf("Client requested replay from %d", clientCursor) 196 + missedEvents := history.GetSince(clientCursor) 197 + for _, evt := range missedEvents { 198 + client.send <- evt 199 + } 200 + } 201 + 202 + go client.readPump() 203 + } 204 + 205 + type Client struct { 206 + hub *Hub 207 + conn *websocket.Conn 208 + send chan BufferedEvent 209 + compress bool 210 + encoder *zstd.Encoder 211 + lastSentUS int64 212 + } 213 + 214 + type Hub struct { 215 + clients map[*Client]bool 216 + broadcast chan BufferedEvent 217 + register chan *Client 218 + unregister chan *Client 219 + mu sync.RWMutex 220 + } 221 + 222 + func newHub() *Hub { 223 + return &Hub{ 224 + clients: make(map[*Client]bool), 225 + broadcast: make(chan BufferedEvent, 10000), 226 + register: make(chan *Client), 227 + unregister: make(chan *Client), 228 + } 229 + } 230 + 231 + func (h *Hub) run() { 232 + for { 233 + select { 234 + case client := <-h.register: 235 + h.mu.Lock() 236 + h.clients[client] = true 237 + h.mu.Unlock() 238 + 239 + case client := <-h.unregister: 240 + h.mu.Lock() 241 + if _, ok := h.clients[client]; ok { 242 + delete(h.clients, client) 243 + close(client.send) 244 + if client.encoder != nil { 245 + client.encoder.Close() 246 + } 247 + } 248 + h.mu.Unlock() 249 + 250 + case msg := <-h.broadcast: 251 + h.mu.RLock() 252 + for client := range h.clients { 253 + select { 254 + case client.send <- msg: 255 + default: 256 + go func(c *Client) { 257 + h.unregister <- c 258 + c.conn.Close() 259 + }(client) 260 + } 261 + } 262 + h.mu.RUnlock() 263 + } 264 + } 265 + } 266 + 267 + func (c *Client) writePump() { 268 + defer c.conn.Close() 269 + 270 + for msg := range c.send { 271 + if msg.RelayTimeUS <= c.lastSentUS { 272 + continue 273 + } 274 + 275 + c.lastSentUS = msg.RelayTimeUS 276 + 277 + if c.compress { 278 + compressed := c.encoder.EncodeAll(msg.RawJSON, nil) 279 + if err := c.conn.WriteMessage(websocket.BinaryMessage, compressed); err != nil { 280 + return 281 + } 282 + } else { 283 + if err := c.conn.WriteMessage(websocket.TextMessage, msg.RawJSON); err != nil { 284 + return 285 + } 286 + } 287 + } 288 + } 22 289 23 - flag.Parse() 290 + func (c *Client) readPump() { 291 + defer func() { 292 + c.hub.unregister <- c 293 + c.conn.Close() 294 + }() 295 + for { 296 + if _, _, err := c.conn.ReadMessage(); err != nil { 297 + break 298 + } 299 + } 300 + } 24 301 25 - fmt.Println(js) // prints: [hi hello what] 302 + func downloadDictionary() ([]byte, error) { 303 + resp, err := http.Get(DictionaryURL) 304 + if err != nil { 305 + return nil, err 306 + } 307 + defer resp.Body.Close() 308 + return io.ReadAll(resp.Body) 26 309 }
+9 -7
shims/lex/app/bsky/feed/defs/embed.go
··· 163 163 if feedPost.Embed.EmbedRecordWithMedia.Media.EmbedVideo != nil { 164 164 videocdn := "https://video.bsky.app" // todo move this 165 165 embedVideo := EmbedVideoViewExtractor(ctx, aturi, feedPost.Embed.EmbedVideo, sl, cs, imgcdn, videocdn, viewer) 166 - embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{ 167 - // EmbedImages_View *EmbedImages_View 168 - // EmbedVideo_View *EmbedVideo_View 169 - EmbedVideo_View: embedVideo.EmbedVideo_View, 170 - // EmbedVideo_View: &appbsky.EmbedVideo_View{ 166 + if embedVideo != nil { 167 + embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{ 168 + // EmbedImages_View *EmbedImages_View 169 + // EmbedVideo_View *EmbedVideo_View 170 + EmbedVideo_View: embedVideo.EmbedVideo_View, 171 + // EmbedVideo_View: &appbsky.EmbedVideo_View{ 171 172 172 - // }, 173 - // EmbedExternal_View *EmbedExternal_View 173 + // }, 174 + // EmbedExternal_View *EmbedExternal_View 175 + } 174 176 } 175 177 // // video extractor 176 178 // embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{