+2
-1
.gitignore
+2
-1
.gitignore
+5
-52
cmd/aturilist/main.go
+5
-52
cmd/aturilist/main.go
···
20
"github.com/dgraph-io/badger/v4"
21
"github.com/gin-gonic/gin"
22
23
-
// Restored your specific imports
24
"tangled.org/whey.party/red-dwarf-server/auth"
25
"tangled.org/whey.party/red-dwarf-server/microcosm"
26
"tangled.org/whey.party/red-dwarf-server/microcosm/slingshot"
···
30
db *badger.DB
31
logger *slog.Logger
32
33
-
// Locks for specific operations if needed, though Badger is thread-safe
34
backfillTracker map[string]*sync.WaitGroup
35
backfillMutex sync.Mutex
36
}
···
73
74
initURLs(*prod)
75
76
-
// 1. Initialize DB
77
db, err := badger.Open(badger.DefaultOptions(*dbPath))
78
if err != nil {
79
logger.Error("Failed to open BadgerDB", "error", err)
···
86
logger: logger,
87
}
88
89
-
// 2. Initialize Auth
90
auther, err := auth.NewAuth(
91
100_000,
92
time.Hour*12,
93
5,
94
-
serviceWebDID, //+"#bsky_appview",
95
)
96
if err != nil {
97
log.Fatalf("Failed to create Auth: %v", err)
98
}
99
100
-
// 3. Initialize Clients
101
ctx := context.Background()
102
sl := slingshot.NewSlingshot(SLINGSHOT_URL)
103
104
-
// 4. Initialize Jetstream
105
config := client.DefaultClientConfig()
106
config.WebsocketURL = JETSTREAM_URL
107
config.Compress = true
···
115
return
116
}
117
118
-
// Connect with cursor (5 minutes ago)
119
cursor := time.Now().Add(-5 * time.Minute).UnixMicro()
120
121
go func() {
122
logger.Info("Connecting to Jetstream...")
123
-
/*
124
-
If you resume a jetstream firehose from a cursor, everything works fine until you catch up to real time.
125
-
At that point, the connection drops. If you connect without a cursor (going straight to realtime), it keeps working.
126
-
*/
127
for {
128
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
129
logger.Error("jetstream connection disconnected", "error", err)
···
131
132
select {
133
case <-ctx.Done():
134
-
return // Context cancelled, exit loop
135
default:
136
logger.Info("Reconnecting to Jetstream in 5 seconds...", "cursor", cursor)
137
time.Sleep(5 * time.Second)
···
139
}
140
}()
141
142
-
// 5. Initialize Router
143
router := gin.New()
144
router.Use(auther.AuthenticateGinRequestViaJWT)
145
···
147
148
router.GET("/xrpc/app.reddwarf.aturilist.countRecords", srv.handleCountRecords)
149
150
-
// heavily rate limited because can be used for spam.
151
router.POST("/xrpc/app.reddwarf.aturilist.indexRecord", func(c *gin.Context) {
152
srv.handleIndexRecord(c, sl)
153
})
···
158
159
router.Run(":7155")
160
}
161
-
162
-
// --- Jetstream Handler ---
163
164
type JetstreamHandler struct {
165
srv *Server
···
168
func (h *JetstreamHandler) HandleEvent(ctx context.Context, event *models.Event) error {
169
if event != nil {
170
if event.Commit != nil {
171
-
// Identify Delete operation
172
isDelete := event.Commit.Operation == models.CommitOperationDelete
173
174
-
// Process
175
h.srv.processRecord(event.Did, event.Commit.Collection, event.Commit.RKey, isDelete)
176
177
}
···
179
return nil
180
}
181
182
-
// --- DB Helpers ---
183
-
184
func makeKey(repo, collection, rkey string) []byte {
185
return []byte(fmt.Sprintf("%s|%s|%s", repo, collection, rkey))
186
}
···
193
return parts[0], parts[1], parts[2], nil
194
}
195
196
-
// processRecord handles the DB write/delete.
197
-
// isDelete=true removes the key. isDelete=false sets the key.
198
func (s *Server) processRecord(repo, collection, rkey string, isDelete bool) {
199
key := makeKey(repo, collection, rkey)
200
···
202
if isDelete {
203
return txn.Delete(key)
204
}
205
-
// On create/update, store current timestamp.
206
-
// You can store more data (Cid, etc) here if needed later.
207
return txn.Set(key, []byte(time.Now().Format(time.RFC3339)))
208
})
209
···
211
s.logger.Error("Failed to update DB", "repo", repo, "rkey", rkey, "err", err)
212
}
213
}
214
-
215
-
// --- HTTP Handlers ---
216
217
func (s *Server) handleListRecords(c *gin.Context) {
218
repo := c.Query("repo")
219
collection := c.Query("collection")
220
cursor := c.Query("cursor")
221
-
reverse := c.Query("reverse") == "true" // 1. Check param
222
limit := 50
223
224
if repo == "" || collection == "" {
···
226
return
227
}
228
229
-
// Base prefix: "repo|collection|"
230
prefixStr := fmt.Sprintf("%s|%s|", repo, collection)
231
prefix := []byte(prefixStr)
232
···
234
var lastRkey string
235
236
err := s.db.View(func(txn *badger.Txn) error {
237
-
// 2. Configure Iterator Options
238
opts := badger.DefaultIteratorOptions
239
opts.PrefetchValues = false
240
-
opts.Reverse = reverse // Set reverse mode
241
242
it := txn.NewIterator(opts)
243
defer it.Close()
244
245
-
// 3. Determine Start Key
246
var startKey []byte
247
if cursor != "" {
248
-
// If cursor exists, we seek to it regardless of direction
249
startKey = makeKey(repo, collection, cursor)
250
} else {
251
if reverse {
252
-
// REVERSE START: "repo|collection|" + 0xFF
253
-
// This seeks to the theoretical end of this prefix range
254
startKey = append([]byte(prefixStr), 0xFF)
255
} else {
256
-
// FORWARD START: "repo|collection|"
257
startKey = prefix
258
}
259
}
260
261
-
// 4. Seek and Iterate
262
it.Seek(startKey)
263
264
-
// Handle Cursor Pagination Skip
265
-
// If we provided a cursor, we likely landed exactly ON that cursor.
266
-
// We want the record *after* (or *before* in reverse) the cursor.
267
if cursor != "" && it.Valid() {
268
-
// Badger's Seek moves to key >= seek_key (even in reverse mode logic varies slightly,
269
-
// but practically we check if we landed on the exact cursor).
270
if string(it.Item().Key()) == string(startKey) {
271
-
it.Next() // Skip the cursor itself
272
}
273
}
274
275
-
// Iterate as long as the key still starts with our prefix
276
for ; it.ValidForPrefix(prefix); it.Next() {
277
if len(aturis) >= limit {
278
break
···
298
"count": len(aturis),
299
}
300
301
-
// Only return cursor if we hit the limit, allowing the client to request the next page
302
if lastRkey != "" && len(aturis) == limit {
303
resp["cursor"] = lastRkey
304
}
···
342
})
343
}
344
345
-
// handleIndexRecord now takes the Slingshot client specifically
346
func (s *Server) handleIndexRecord(c *gin.Context, sl *microcosm.MicrocosmClient) {
347
-
//authedUserDid := c.GetString("user_did")
348
-
// Support JSON body preferentially, fallback to Query/Form
349
var req struct {
350
Collection string `json:"collection"`
351
Repo string `json:"repo"`
···
363
return
364
}
365
366
-
// Verify existence using Slingshot/Agnostic
367
recordResponse, err := agnostic.RepoGetRecord(c.Request.Context(), sl, "", req.Collection, req.Repo, req.RKey)
368
if err != nil {
369
-
// Does not exist remotely -> Delete locally
370
s.processRecord(req.Repo, req.Collection, req.RKey, true)
371
372
-
// You might want to return 200 even if deleted, to confirm "indexing done"
373
c.Status(200)
374
return
375
}
376
377
-
// Exists remotely -> Parse and Insert locally
378
uri := recordResponse.Uri
379
aturi, err := syntax.ParseATURI(uri)
380
if err != nil {
···
20
"github.com/dgraph-io/badger/v4"
21
"github.com/gin-gonic/gin"
22
23
"tangled.org/whey.party/red-dwarf-server/auth"
24
"tangled.org/whey.party/red-dwarf-server/microcosm"
25
"tangled.org/whey.party/red-dwarf-server/microcosm/slingshot"
···
29
db *badger.DB
30
logger *slog.Logger
31
32
backfillTracker map[string]*sync.WaitGroup
33
backfillMutex sync.Mutex
34
}
···
71
72
initURLs(*prod)
73
74
db, err := badger.Open(badger.DefaultOptions(*dbPath))
75
if err != nil {
76
logger.Error("Failed to open BadgerDB", "error", err)
···
83
logger: logger,
84
}
85
86
auther, err := auth.NewAuth(
87
100_000,
88
time.Hour*12,
89
5,
90
+
serviceWebDID,
91
)
92
if err != nil {
93
log.Fatalf("Failed to create Auth: %v", err)
94
}
95
96
ctx := context.Background()
97
sl := slingshot.NewSlingshot(SLINGSHOT_URL)
98
99
config := client.DefaultClientConfig()
100
config.WebsocketURL = JETSTREAM_URL
101
config.Compress = true
···
109
return
110
}
111
112
cursor := time.Now().Add(-5 * time.Minute).UnixMicro()
113
114
go func() {
115
logger.Info("Connecting to Jetstream...")
116
for {
117
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
118
logger.Error("jetstream connection disconnected", "error", err)
···
120
121
select {
122
case <-ctx.Done():
123
+
return
124
default:
125
logger.Info("Reconnecting to Jetstream in 5 seconds...", "cursor", cursor)
126
time.Sleep(5 * time.Second)
···
128
}
129
}()
130
131
router := gin.New()
132
router.Use(auther.AuthenticateGinRequestViaJWT)
133
···
135
136
router.GET("/xrpc/app.reddwarf.aturilist.countRecords", srv.handleCountRecords)
137
138
router.POST("/xrpc/app.reddwarf.aturilist.indexRecord", func(c *gin.Context) {
139
srv.handleIndexRecord(c, sl)
140
})
···
145
146
router.Run(":7155")
147
}
148
149
type JetstreamHandler struct {
150
srv *Server
···
153
func (h *JetstreamHandler) HandleEvent(ctx context.Context, event *models.Event) error {
154
if event != nil {
155
if event.Commit != nil {
156
isDelete := event.Commit.Operation == models.CommitOperationDelete
157
158
h.srv.processRecord(event.Did, event.Commit.Collection, event.Commit.RKey, isDelete)
159
160
}
···
162
return nil
163
}
164
165
func makeKey(repo, collection, rkey string) []byte {
166
return []byte(fmt.Sprintf("%s|%s|%s", repo, collection, rkey))
167
}
···
174
return parts[0], parts[1], parts[2], nil
175
}
176
177
func (s *Server) processRecord(repo, collection, rkey string, isDelete bool) {
178
key := makeKey(repo, collection, rkey)
179
···
181
if isDelete {
182
return txn.Delete(key)
183
}
184
return txn.Set(key, []byte(time.Now().Format(time.RFC3339)))
185
})
186
···
188
s.logger.Error("Failed to update DB", "repo", repo, "rkey", rkey, "err", err)
189
}
190
}
191
192
func (s *Server) handleListRecords(c *gin.Context) {
193
repo := c.Query("repo")
194
collection := c.Query("collection")
195
cursor := c.Query("cursor")
196
+
reverse := c.Query("reverse") == "true"
197
limit := 50
198
199
if repo == "" || collection == "" {
···
201
return
202
}
203
204
prefixStr := fmt.Sprintf("%s|%s|", repo, collection)
205
prefix := []byte(prefixStr)
206
···
208
var lastRkey string
209
210
err := s.db.View(func(txn *badger.Txn) error {
211
opts := badger.DefaultIteratorOptions
212
opts.PrefetchValues = false
213
+
opts.Reverse = reverse
214
215
it := txn.NewIterator(opts)
216
defer it.Close()
217
218
var startKey []byte
219
if cursor != "" {
220
startKey = makeKey(repo, collection, cursor)
221
} else {
222
if reverse {
223
startKey = append([]byte(prefixStr), 0xFF)
224
} else {
225
startKey = prefix
226
}
227
}
228
229
it.Seek(startKey)
230
231
if cursor != "" && it.Valid() {
232
if string(it.Item().Key()) == string(startKey) {
233
+
it.Next()
234
}
235
}
236
237
for ; it.ValidForPrefix(prefix); it.Next() {
238
if len(aturis) >= limit {
239
break
···
259
"count": len(aturis),
260
}
261
262
if lastRkey != "" && len(aturis) == limit {
263
resp["cursor"] = lastRkey
264
}
···
302
})
303
}
304
305
func (s *Server) handleIndexRecord(c *gin.Context, sl *microcosm.MicrocosmClient) {
306
var req struct {
307
Collection string `json:"collection"`
308
Repo string `json:"repo"`
···
320
return
321
}
322
323
recordResponse, err := agnostic.RepoGetRecord(c.Request.Context(), sl, "", req.Collection, req.Repo, req.RKey)
324
if err != nil {
325
s.processRecord(req.Repo, req.Collection, req.RKey, true)
326
327
c.Status(200)
328
return
329
}
330
331
uri := recordResponse.Uri
332
aturi, err := syntax.ParseATURI(uri)
333
if err != nil {
+293
-10
cmd/jetrelay/main.go
+293
-10
cmd/jetrelay/main.go
···
1
package main
2
3
import (
4
-
"flag"
5
"fmt"
6
)
7
8
-
type multiFlag []string
9
10
-
func (m *multiFlag) String() string {
11
-
return fmt.Sprint(*m)
12
}
13
14
-
func (m *multiFlag) Set(value string) error {
15
-
*m = append(*m, value)
16
return nil
17
}
18
19
func main() {
20
-
var js multiFlag
21
-
flag.Var(&js, "j", "jetstream instances 'write multiple to input more than one'")
22
23
-
flag.Parse()
24
25
-
fmt.Println(js) // prints: [hi hello what]
26
}
···
1
package main
2
3
import (
4
+
"context"
5
+
"encoding/json"
6
"fmt"
7
+
"io"
8
+
"log"
9
+
"net/http"
10
+
"sort"
11
+
"sync"
12
+
"time"
13
+
14
+
"github.com/gorilla/websocket"
15
+
"github.com/klauspost/compress/zstd"
16
)
17
18
+
const (
19
+
ServerPort = ":3878"
20
+
DictionaryURL = "https://raw.githubusercontent.com/bluesky-social/jetstream/main/pkg/models/zstd_dictionary"
21
+
BufferSize = 100000
22
+
ReconnectDelay = 5 * time.Second
23
+
)
24
25
+
var SourceJetstreams = []string{
26
+
"ws://localhost:6008/subscribe", // local jetstream
27
+
"ws://localhost:3877/subscribe", // local backstream
28
+
}
29
+
30
+
type Event struct {
31
+
Kind string `json:"kind"`
32
+
TimeUS int64 `json:"time_us"`
33
+
Commit json.RawMessage `json:"commit,omitempty"`
34
+
}
35
+
36
+
type BufferedEvent struct {
37
+
RelayTimeUS int64
38
+
RawJSON []byte
39
}
40
41
+
type History struct {
42
+
events []BufferedEvent
43
+
mu sync.RWMutex
44
+
}
45
+
46
+
func (h *History) Add(jsonBytes []byte, relayTime int64) {
47
+
h.mu.Lock()
48
+
defer h.mu.Unlock()
49
+
50
+
h.events = append(h.events, BufferedEvent{
51
+
RelayTimeUS: relayTime,
52
+
RawJSON: jsonBytes,
53
+
})
54
+
55
+
if len(h.events) > BufferSize {
56
+
h.events = h.events[len(h.events)-BufferSize:]
57
+
}
58
+
}
59
+
60
+
func (h *History) GetSince(cursor int64) []BufferedEvent {
61
+
h.mu.RLock()
62
+
defer h.mu.RUnlock()
63
+
64
+
idx := sort.Search(len(h.events), func(i int) bool {
65
+
return h.events[i].RelayTimeUS > cursor
66
+
})
67
+
68
+
if idx < len(h.events) {
69
+
result := make([]BufferedEvent, len(h.events)-idx)
70
+
copy(result, h.events[idx:])
71
+
return result
72
+
}
73
return nil
74
}
75
76
+
var (
77
+
history = &History{events: make([]BufferedEvent, 0, BufferSize)}
78
+
zstdDict []byte
79
+
hub *Hub
80
+
upgrader = websocket.Upgrader{
81
+
CheckOrigin: func(r *http.Request) bool { return true },
82
+
}
83
+
)
84
+
85
func main() {
86
+
log.Println("Initializing Relay...")
87
+
88
+
var err error
89
+
zstdDict, err = downloadDictionary()
90
+
if err != nil {
91
+
log.Fatalf("Failed to load dictionary: %v", err)
92
+
}
93
+
94
+
hub = newHub()
95
+
go hub.run()
96
+
97
+
ctx := context.Background()
98
+
for i, url := range SourceJetstreams {
99
+
go runUpstreamConsumer(ctx, i, url)
100
+
}
101
+
102
+
http.HandleFunc("/subscribe", serveWs)
103
+
log.Printf("🔥 Relay Active on %s", ServerPort)
104
+
if err := http.ListenAndServe(ServerPort, nil); err != nil {
105
+
log.Fatal(err)
106
+
}
107
+
}
108
+
109
+
func runUpstreamConsumer(ctx context.Context, id int, baseURL string) {
110
+
var lastSeenCursor int64 = 0
111
+
112
+
for {
113
+
connectURL := baseURL
114
+
if lastSeenCursor > 0 {
115
+
connectURL = fmt.Sprintf("%s?cursor=%d", baseURL, lastSeenCursor)
116
+
log.Printf("[Input %d] Reconnecting with cursor: %d", id, lastSeenCursor)
117
+
} else {
118
+
log.Printf("[Input %d] Connecting fresh...", id)
119
+
}
120
+
121
+
conn, _, err := websocket.DefaultDialer.Dial(connectURL, nil)
122
+
if err != nil {
123
+
log.Printf("[Input %d] Connect failed: %v. Retrying...", id, err)
124
+
time.Sleep(ReconnectDelay)
125
+
continue
126
+
}
127
+
128
+
log.Printf("[Input %d] Connected.", id)
129
+
130
+
for {
131
+
_, msg, err := conn.ReadMessage()
132
+
if err != nil {
133
+
log.Printf("[Input %d] Read error: %v", id, err)
134
+
break
135
+
}
136
+
137
+
var genericEvent map[string]interface{}
138
+
if err := json.Unmarshal(msg, &genericEvent); err != nil {
139
+
continue
140
+
}
141
+
142
+
if t, ok := genericEvent["time_us"].(float64); ok {
143
+
lastSeenCursor = int64(t)
144
+
}
145
+
146
+
nowUS := time.Now().UnixMicro()
147
+
genericEvent["time_us"] = nowUS
148
+
149
+
finalBytes, err := json.Marshal(genericEvent)
150
+
if err != nil {
151
+
continue
152
+
}
153
+
154
+
history.Add(finalBytes, nowUS)
155
+
156
+
hub.broadcast <- BufferedEvent{RelayTimeUS: nowUS, RawJSON: finalBytes}
157
+
}
158
+
conn.Close()
159
+
time.Sleep(ReconnectDelay)
160
+
}
161
+
}
162
+
163
+
func serveWs(w http.ResponseWriter, r *http.Request) {
164
+
conn, err := upgrader.Upgrade(w, r, nil)
165
+
if err != nil {
166
+
return
167
+
}
168
+
169
+
compress := r.URL.Query().Get("compress") == "true"
170
+
171
+
var clientCursor int64 = 0
172
+
cursorStr := r.URL.Query().Get("cursor")
173
+
if cursorStr != "" {
174
+
fmt.Sscanf(cursorStr, "%d", &clientCursor)
175
+
}
176
+
177
+
client := &Client{
178
+
hub: hub,
179
+
conn: conn,
180
+
send: make(chan BufferedEvent, 2048),
181
+
compress: compress,
182
+
lastSentUS: 0,
183
+
}
184
+
185
+
if compress {
186
+
enc, _ := zstd.NewWriter(nil, zstd.WithEncoderDict(zstdDict))
187
+
client.encoder = enc
188
+
}
189
+
190
+
client.hub.register <- client
191
+
192
+
go client.writePump()
193
+
194
+
if clientCursor > 0 {
195
+
log.Printf("Client requested replay from %d", clientCursor)
196
+
missedEvents := history.GetSince(clientCursor)
197
+
for _, evt := range missedEvents {
198
+
client.send <- evt
199
+
}
200
+
}
201
+
202
+
go client.readPump()
203
+
}
204
+
205
+
type Client struct {
206
+
hub *Hub
207
+
conn *websocket.Conn
208
+
send chan BufferedEvent
209
+
compress bool
210
+
encoder *zstd.Encoder
211
+
lastSentUS int64
212
+
}
213
+
214
+
type Hub struct {
215
+
clients map[*Client]bool
216
+
broadcast chan BufferedEvent
217
+
register chan *Client
218
+
unregister chan *Client
219
+
mu sync.RWMutex
220
+
}
221
+
222
+
func newHub() *Hub {
223
+
return &Hub{
224
+
clients: make(map[*Client]bool),
225
+
broadcast: make(chan BufferedEvent, 10000),
226
+
register: make(chan *Client),
227
+
unregister: make(chan *Client),
228
+
}
229
+
}
230
+
231
+
func (h *Hub) run() {
232
+
for {
233
+
select {
234
+
case client := <-h.register:
235
+
h.mu.Lock()
236
+
h.clients[client] = true
237
+
h.mu.Unlock()
238
+
239
+
case client := <-h.unregister:
240
+
h.mu.Lock()
241
+
if _, ok := h.clients[client]; ok {
242
+
delete(h.clients, client)
243
+
close(client.send)
244
+
if client.encoder != nil {
245
+
client.encoder.Close()
246
+
}
247
+
}
248
+
h.mu.Unlock()
249
+
250
+
case msg := <-h.broadcast:
251
+
h.mu.RLock()
252
+
for client := range h.clients {
253
+
select {
254
+
case client.send <- msg:
255
+
default:
256
+
go func(c *Client) {
257
+
h.unregister <- c
258
+
c.conn.Close()
259
+
}(client)
260
+
}
261
+
}
262
+
h.mu.RUnlock()
263
+
}
264
+
}
265
+
}
266
+
267
+
func (c *Client) writePump() {
268
+
defer c.conn.Close()
269
+
270
+
for msg := range c.send {
271
+
if msg.RelayTimeUS <= c.lastSentUS {
272
+
continue
273
+
}
274
+
275
+
c.lastSentUS = msg.RelayTimeUS
276
+
277
+
if c.compress {
278
+
compressed := c.encoder.EncodeAll(msg.RawJSON, nil)
279
+
if err := c.conn.WriteMessage(websocket.BinaryMessage, compressed); err != nil {
280
+
return
281
+
}
282
+
} else {
283
+
if err := c.conn.WriteMessage(websocket.TextMessage, msg.RawJSON); err != nil {
284
+
return
285
+
}
286
+
}
287
+
}
288
+
}
289
290
+
func (c *Client) readPump() {
291
+
defer func() {
292
+
c.hub.unregister <- c
293
+
c.conn.Close()
294
+
}()
295
+
for {
296
+
if _, _, err := c.conn.ReadMessage(); err != nil {
297
+
break
298
+
}
299
+
}
300
+
}
301
302
+
func downloadDictionary() ([]byte, error) {
303
+
resp, err := http.Get(DictionaryURL)
304
+
if err != nil {
305
+
return nil, err
306
+
}
307
+
defer resp.Body.Close()
308
+
return io.ReadAll(resp.Body)
309
}