···11+# USING CODEGEN
22+33+i dont really understand how indigo lexgen works but this two works i guess
44+55+run this in the project root
66+77+### struct gen
88+you really only need to do this when lexicon changes
99+```bash
1010+go run github.com/bluesky-social/indigo/cmd/lexgen/ \
1111+--package labelmerge_lex --outdir ./labelmerge/lex \
1212+--external-lexicons ./labelmerge/lex/generation/external/com.atproto.label.defs.json \
1313+--build-file ./labelmerge/lex/generation/lexgen.json \
1414+./labelmerge/lex/generation/defs
1515+```
1616+1717+### server gen
1818+this is really only needed to be done once per project and never again.
1919+2020+written here for future reference.
2121+```bash
2222+go run github.com/bluesky-social/indigo/cmd/lexgen/ \
2323+--gen-server \
2424+--gen-handlers \
2525+--package main \
2626+--outdir ./cmd/labelmerge/ \
2727+--external-lexicons ./labelmerge/lex/generation/external/ \
2828+--types-import app.reddwarf.labelmerge:tangled.org/whey.party/red-dwarf-server/labelmerge/lex \
2929+--types-import com.atproto:github.com/bluesky-social/indigo/api/atproto \
3030+--build-file ./labelmerge/lex/generation/lexgen.json \
3131+./labelmerge/lex/defs/
3232+```
3333+3434+### typescript client gen
3535+3636+uses a separate tool but its kinda related so im gonna put it here too
3737+```bash
3838+npx @atproto/lex-cli gen-api ./your/typescript/project/path ./labelmerge/lex/generation/defs/app.reddwarf.labelmerge.queryLabels.json ./labelmerge/lex/generation/external/com.atproto.label.defs.json
3939+```
+45
labelmerge/lex/labelmergequeryLabels.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+// Lexicon schema: app.reddwarf.labelmerge.queryLabels
44+55+package labelmerge_lex
66+77+import (
88+ "context"
99+1010+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1111+ lexutil "github.com/bluesky-social/indigo/lex/util"
1212+)
1313+1414+// QueryLabels_Error is a "error" in the app.reddwarf.labelmerge.queryLabels schema.
1515+type QueryLabels_Error struct {
1616+ E *string `json:"e,omitempty" cborgen:"e,omitempty"`
1717+ S string `json:"s" cborgen:"s"`
1818+}
1919+2020+// QueryLabels_Output is the output of a app.reddwarf.labelmerge.queryLabels call.
2121+type QueryLabels_Output struct {
2222+ Error []*QueryLabels_Error `json:"error,omitempty" cborgen:"error,omitempty"`
2323+ Labels []*comatproto.LabelDefs_Label `json:"labels" cborgen:"labels"`
2424+}
2525+2626+// QueryLabels calls the XRPC method "app.reddwarf.labelmerge.queryLabels".
2727+//
2828+// l: List of label sources (labeler DIDs) to filter on.
2929+// s: List of label subjects (strings).
3030+// strict: If true then any errors will throw the entire query
3131+func QueryLabels(ctx context.Context, c lexutil.LexClient, l []string, s []string, strict bool) (*QueryLabels_Output, error) {
3232+ var out QueryLabels_Output
3333+3434+ params := map[string]interface{}{}
3535+ params["l"] = l
3636+ params["s"] = s
3737+ if strict {
3838+ params["strict"] = strict
3939+ }
4040+ if err := c.LexDo(ctx, lexutil.Query, "", "app.reddwarf.labelmerge.queryLabels", params, nil, &out); err != nil {
4141+ return nil, err
4242+ }
4343+4444+ return &out, nil
4545+}
+150
labelmerge/lru/lru.go
···11+package lru
22+33+import (
44+ "fmt"
55+ "sync"
66+77+ badger "github.com/dgraph-io/badger/v4"
88+)
99+1010+// Cache is a persistent, pure Badger-backed cache
1111+type Cache[K comparable, V any] struct {
1212+ db *badger.DB
1313+ mu sync.RWMutex // protects only internal operations, not the DB itself
1414+1515+ serialize func(V) ([]byte, error)
1616+ deserialize func([]byte) (V, error)
1717+1818+ // Hooks
1919+ OnAdd func(K)
2020+ OnRemove func(K)
2121+}
2222+2323+// New creates a new Badger-backed cache
2424+func New[K comparable, V any](
2525+ dbPath string,
2626+ serialize func(V) ([]byte, error),
2727+ deserialize func([]byte) (V, error),
2828+) (*Cache[K, V], error) {
2929+3030+ opts := badger.DefaultOptions(dbPath).WithLogger(nil) // suppress Badger logs
3131+ db, err := badger.Open(opts)
3232+ if err != nil {
3333+ return nil, fmt.Errorf("failed to open Badger DB: %w", err)
3434+ }
3535+3636+ return &Cache[K, V]{
3737+ db: db,
3838+ serialize: serialize,
3939+ deserialize: deserialize,
4040+ OnAdd: func(K) {},
4141+ OnRemove: func(K) {},
4242+ }, nil
4343+}
4444+4545+// Close closes the underlying DB
4646+func (c *Cache[K, V]) Close() error {
4747+ return c.db.Close()
4848+}
4949+5050+// Get retrieves a value from Badger
5151+func (c *Cache[K, V]) Get(key K) (V, bool) {
5252+ var zero V
5353+5454+ err := c.db.View(func(txn *badger.Txn) error {
5555+ item, err := txn.Get([]byte(fmt.Sprintf("%v", key)))
5656+ if err != nil {
5757+ return err
5858+ }
5959+6060+ return item.Value(func(val []byte) error {
6161+ value, err := c.deserialize(val)
6262+ if err != nil {
6363+ return err
6464+ }
6565+ zero = value
6666+ return nil
6767+ })
6868+ })
6969+7070+ if err == badger.ErrKeyNotFound {
7171+ return zero, false
7272+ } else if err != nil {
7373+ return zero, false
7474+ }
7575+7676+ // Trigger hook
7777+ if c.OnAdd != nil {
7878+ c.OnAdd(key)
7979+ }
8080+ return zero, true
8181+}
8282+8383+// Put stores a value in Badger
8484+func (c *Cache[K, V]) Put(key K, value V) {
8585+ serializedValue, err := c.serialize(value)
8686+ if err != nil {
8787+ return
8888+ }
8989+9090+ err = c.db.Update(func(txn *badger.Txn) error {
9191+ return txn.Set([]byte(fmt.Sprintf("%v", key)), serializedValue)
9292+ })
9393+ if err != nil {
9494+ return
9595+ }
9696+9797+ if c.OnAdd != nil {
9898+ c.OnAdd(key)
9999+ }
100100+}
101101+102102+// Remove deletes a value from Badger
103103+func (c *Cache[K, V]) Remove(key K) {
104104+ _ = c.db.Update(func(txn *badger.Txn) error {
105105+ return txn.Delete([]byte(fmt.Sprintf("%v", key)))
106106+ })
107107+108108+ if c.OnRemove != nil {
109109+ c.OnRemove(key)
110110+ }
111111+}
112112+113113+// Len counts the number of keys in Badger
114114+func (c *Cache[K, V]) Len() int {
115115+ count := 0
116116+ _ = c.db.View(func(txn *badger.Txn) error {
117117+ iter := txn.NewIterator(badger.DefaultIteratorOptions)
118118+ defer iter.Close()
119119+120120+ for iter.Rewind(); iter.Valid(); iter.Next() {
121121+ count++
122122+ }
123123+ return nil
124124+ })
125125+ return count
126126+}
127127+128128+// Keys returns all keys in Badger
129129+func (c *Cache[K, V]) Keys() []K {
130130+ var keys []K
131131+ _ = c.db.View(func(txn *badger.Txn) error {
132132+ iter := txn.NewIterator(badger.DefaultIteratorOptions)
133133+ defer iter.Close()
134134+135135+ for iter.Rewind(); iter.Valid(); iter.Next() {
136136+ item := iter.Item()
137137+ k := item.Key()
138138+ var key K
139139+ fmt.Sscanf(string(k), "%v", &key)
140140+ keys = append(keys, key)
141141+ }
142142+ return nil
143143+ })
144144+ return keys
145145+}
146146+147147+// Clear removes all keys
148148+func (c *Cache[K, V]) Clear() error {
149149+ return c.db.DropAll()
150150+}
+386
labelmerge/stream/stream.go
···11+package stream
22+33+import (
44+ "context"
55+ "fmt"
66+ "log/slog"
77+ "net/http"
88+ "net/url"
99+ "strconv"
1010+ "strings"
1111+ "sync"
1212+ "time"
1313+1414+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1515+ "github.com/bluesky-social/indigo/atproto/identity"
1616+ "github.com/bluesky-social/indigo/atproto/syntax"
1717+ "github.com/bluesky-social/indigo/events"
1818+ "github.com/bluesky-social/indigo/events/schedulers/parallel"
1919+ "github.com/gorilla/websocket"
2020+)
2121+2222+const LabelSubscribePath = "xrpc/com.atproto.label.subscribeLabels"
2323+const MaxWorkers = 4
2424+const EventChannelCapacity = 1000
2525+2626+// LabelEvent is the simplified, unified event object sent to the consumer.
2727+type LabelEvent struct {
2828+ SourceDid string // The DID of the labeler service that sent the event
2929+ Cursor int64 // The sequence number (seq) of the event
3030+ Value *comatproto.LabelDefs_Label
3131+}
3232+3333+// SubscriptionContext holds the state for a single labeler connection.
3434+type SubscriptionContext struct {
3535+ DID string
3636+ ServiceURL string // Resolved WSS URL
3737+ CurrentCursor string // Sequence string used for connection restarts
3838+ WorkerCancel context.CancelFunc
3939+ WorkerCtx context.Context
4040+ IsRunning bool
4141+ mu sync.Mutex
4242+}
4343+4444+// LabelerSubscriptionManager manages connections to multiple ATProto labeler services.
4545+type LabelerSubscriptionManager struct {
4646+ log *slog.Logger
4747+ managerCtx context.Context
4848+ managerCancel context.CancelFunc
4949+ wg sync.WaitGroup
5050+5151+ resolver identity.Directory // DID Resolver dependency
5252+5353+ subscriptions map[string]*SubscriptionContext
5454+ subMu sync.RWMutex
5555+5656+ eventCh chan LabelEvent
5757+}
5858+5959+// NewLabelerSubscriptionManager creates a new instance of the manager.
6060+func NewLabelerSubscriptionManager(log *slog.Logger) *LabelerSubscriptionManager {
6161+ ctx, cancel := context.WithCancel(context.Background())
6262+ dir := identity.DefaultDirectory() // Cached with 24hr TTL
6363+ return &LabelerSubscriptionManager{
6464+ log: log,
6565+ managerCtx: ctx,
6666+ managerCancel: cancel,
6767+ subscriptions: make(map[string]*SubscriptionContext),
6868+ eventCh: make(chan LabelEvent, EventChannelCapacity),
6969+ resolver: dir,
7070+ }
7171+}
7272+7373+// Events returns the read-only channel where all aggregated LabelEvents are sent.
7474+func (m *LabelerSubscriptionManager) Events() <-chan LabelEvent {
7575+ return m.eventCh
7676+}
7777+7878+// Start initiates the subscription manager. It attempts to resolve the URL and start workers
7979+// for all currently added labelers.
8080+func (m *LabelerSubscriptionManager) Start() {
8181+ m.subMu.RLock()
8282+ defer m.subMu.RUnlock()
8383+8484+ m.log.Info("Starting Labeler Subscription Manager", "count", len(m.subscriptions))
8585+8686+ for _, sub := range m.subscriptions {
8787+ // Initial resolution check (the worker loop handles retries)
8888+ did, err1 := syntax.ParseDID(sub.DID)
8989+ if err1 != nil {
9090+ m.log.Warn(sub.DID + "ResolutionFailure invalid did")
9191+ } else {
9292+ ident, err2 := m.resolver.LookupDID(context.TODO(), did)
9393+ if err2 != nil {
9494+ m.log.Warn(sub.DID + "ResolutionFailure not reachable")
9595+ } else {
9696+ labelerURL := ident.GetServiceEndpoint("atproto_labeler")
9797+ if labelerURL == "" {
9898+ m.log.Warn(sub.DID + "ResolutionFailure no service endpoint")
9999+ } else {
100100+ sub.ServiceURL = labelerURL
101101+ m.log.Info("Initial DID resolved successfully", "did", sub.DID, "service_url", sub.ServiceURL)
102102+ }
103103+ }
104104+ }
105105+ m.startWorker(sub)
106106+ }
107107+}
108108+109109+// Stop gracefully shuts down all active connections and the manager.
110110+func (m *LabelerSubscriptionManager) Stop() {
111111+ m.managerCancel()
112112+ m.log.Info("Waiting for all labeler workers to shut down...")
113113+ m.wg.Wait()
114114+ m.log.Info("Manager stopped gracefully.")
115115+ close(m.eventCh)
116116+}
117117+118118+// AddLabeler registers a new labeler DID. URL resolution is handled lazily (in Start or in the worker loop).
119119+func (m *LabelerSubscriptionManager) AddLabeler(did string, cursor string) error {
120120+ if did == "" {
121121+ return fmt.Errorf("did cannot be empty")
122122+ }
123123+124124+ m.subMu.Lock()
125125+ defer m.subMu.Unlock()
126126+127127+ if _, exists := m.subscriptions[did]; exists {
128128+ m.log.Warn("Labeler already exists", "did", did)
129129+ return nil
130130+ }
131131+132132+ workerCtx, workerCancel := context.WithCancel(m.managerCtx)
133133+134134+ sub := &SubscriptionContext{
135135+ DID: did,
136136+ ServiceURL: "", // Must be resolved before use
137137+ CurrentCursor: cursor,
138138+ WorkerCtx: workerCtx,
139139+ WorkerCancel: workerCancel,
140140+ IsRunning: false,
141141+ }
142142+143143+ m.subscriptions[did] = sub
144144+145145+ // If the overall manager is running, try to resolve and start the worker immediately.
146146+ select {
147147+ case <-m.managerCtx.Done():
148148+ // Manager is shutting down, just register the sub but don't start
149149+ default:
150150+ // Attempt immediate resolution and start
151151+ // entry, err := m.resolver.Resolve(sub.DID)
152152+ // if err == nil {
153153+ // sub.ServiceURL = entry.Domain
154154+ // m.log.Info("Immediate DID resolution successful for new labeler", "did", sub.DID, "service_url", sub.ServiceURL)
155155+ // } else {
156156+ // m.log.Warn("Immediate DID resolution failed for new labeler. Worker will retry.", "did", sub.DID, "err", err)
157157+ // }
158158+ // m.startWorker(sub)
159159+ did, err1 := syntax.ParseDID(sub.DID)
160160+ if err1 != nil {
161161+ m.log.Warn(sub.DID + "ResolutionFailure invalid did")
162162+ } else {
163163+ ident, err2 := m.resolver.LookupDID(context.TODO(), did)
164164+ if err2 != nil {
165165+ m.log.Warn(sub.DID + "ResolutionFailure not reachable")
166166+ } else {
167167+ labelerURL := ident.GetServiceEndpoint("atproto_labeler")
168168+ if labelerURL == "" {
169169+ m.log.Warn(sub.DID + "ResolutionFailure no service endpoint")
170170+ } else {
171171+ sub.ServiceURL = labelerURL
172172+ m.log.Info("Initial DID resolved successfully", "did", sub.DID, "service_url", sub.ServiceURL)
173173+ }
174174+ }
175175+ }
176176+ m.startWorker(sub)
177177+ }
178178+179179+ m.log.Info("Labeler added", "did", did, "start_cursor", cursor)
180180+ return nil
181181+}
182182+183183+// RemoveLabeler stops the stream worker for the given DID and removes it from the manager.
184184+func (m *LabelerSubscriptionManager) RemoveLabeler(did string) {
185185+ m.subMu.Lock()
186186+ defer m.subMu.Unlock()
187187+188188+ sub, exists := m.subscriptions[did]
189189+ if !exists {
190190+ return
191191+ }
192192+193193+ m.log.Info("Removing labeler subscription", "did", did)
194194+ sub.WorkerCancel()
195195+ delete(m.subscriptions, did)
196196+}
197197+198198+// startWorker spins up the connection management goroutine for a single labeler.
199199+func (m *LabelerSubscriptionManager) startWorker(sub *SubscriptionContext) {
200200+ sub.mu.Lock()
201201+ if sub.IsRunning {
202202+ sub.mu.Unlock()
203203+ return
204204+ }
205205+ sub.IsRunning = true
206206+ sub.mu.Unlock()
207207+208208+ m.wg.Add(1)
209209+ go func() {
210210+ defer m.wg.Done()
211211+ defer func() {
212212+ sub.mu.Lock()
213213+ sub.IsRunning = false
214214+ sub.mu.Unlock()
215215+ }()
216216+ m.manageSubscription(sub)
217217+ }()
218218+}
219219+220220+// manageSubscription handles connection retries, cursors, and running the stream processor.
221221+func (m *LabelerSubscriptionManager) manageSubscription(sub *SubscriptionContext) {
222222+ didLog := m.log.With("did", sub.DID)
223223+ didLog.Info("Worker started")
224224+225225+ for {
226226+ select {
227227+ case <-sub.WorkerCtx.Done():
228228+ didLog.Info("Worker received stop signal, shutting down.")
229229+ return
230230+ default:
231231+ // Proceed
232232+ }
233233+ var err error
234234+235235+ // 1. Ensure ServiceURL is resolved
236236+ if sub.ServiceURL == "" {
237237+ didLog.Info("Service URL not resolved, attempting DID resolution.")
238238+ // entry, err := m.resolver.Resolve(sub.DID)
239239+ // if err != nil {
240240+ // didLog.Error("DID resolution failed, retrying...", "err", err)
241241+ // goto WaitAndRetry
242242+ // }
243243+ // sub.ServiceURL = entry.Domain
244244+ // didLog.Info("DID resolution successful", "service_url", sub.ServiceURL)
245245+ did, err1 := syntax.ParseDID(sub.DID)
246246+ if err1 != nil {
247247+ m.log.Warn(sub.DID + "ResolutionFailure invalid did")
248248+ } else {
249249+ ident, err2 := m.resolver.LookupDID(context.TODO(), did)
250250+ if err2 != nil {
251251+ m.log.Warn(sub.DID + "ResolutionFailure not reachable")
252252+ goto WaitAndRetry
253253+ } else {
254254+ labelerURL := ident.GetServiceEndpoint("atproto_labeler")
255255+ if labelerURL == "" {
256256+ m.log.Warn(sub.DID + "ResolutionFailure no service endpoint")
257257+ } else {
258258+ sub.ServiceURL = labelerURL
259259+ m.log.Info("Initial DID resolved successfully", "did", sub.DID, "service_url", sub.ServiceURL)
260260+ }
261261+ }
262262+ }
263263+ }
264264+265265+ // 2. Attempt to stream
266266+ err = m.dialAndStream(sub)
267267+268268+ if sub.WorkerCtx.Err() != nil {
269269+ return
270270+ }
271271+272272+ if err != nil {
273273+ didLog.Error("Stream failed, attempting restart", "err", err, "cursor", sub.CurrentCursor)
274274+ } else {
275275+ didLog.Info("Stream closed cleanly, attempting restart.")
276276+ }
277277+278278+ WaitAndRetry:
279279+ // Wait before retrying
280280+ didLog.Info("Waiting 5s before reconnecting/retrying resolution...")
281281+ select {
282282+ case <-time.After(5 * time.Second):
283283+ // Proceed with retry
284284+ case <-sub.WorkerCtx.Done():
285285+ return // Exit if canceled during wait
286286+ }
287287+ }
288288+}
289289+290290+func httpToWS(s string) string {
291291+ if strings.HasPrefix(s, "https://") {
292292+ return "wss://" + s[len("https://"):]
293293+ }
294294+ if strings.HasPrefix(s, "http://") {
295295+ return "ws://" + s[len("http://"):]
296296+ }
297297+ return s
298298+}
299299+300300+// dialAndStream establishes the WebSocket connection and processes the stream.
301301+func (m *LabelerSubscriptionManager) dialAndStream(sub *SubscriptionContext) error {
302302+ didLog := m.log.With("did", sub.DID)
303303+304304+ fullURL := httpToWS(sub.ServiceURL) + "/" + LabelSubscribePath
305305+ if sub.CurrentCursor != "" {
306306+ fullURL = fmt.Sprintf("%s?cursor=%s", fullURL, sub.CurrentCursor)
307307+ }
308308+309309+ u, err := url.Parse(fullURL)
310310+ if err != nil {
311311+ return fmt.Errorf("failed to parse URL: %w", err)
312312+ }
313313+314314+ // 1. Establish WebSocket Connection
315315+ dialer := websocket.DefaultDialer
316316+ con, resp, err := dialer.Dial(u.String(), http.Header{
317317+ "User-Agent": []string{"LabelerSubscriptionManager/1.0"},
318318+ })
319319+320320+ if err != nil {
321321+ if resp != nil {
322322+ didLog.Error("WebSocket connection failed", "status", resp.StatusCode)
323323+ }
324324+ // If dial fails due to network/DNS/TLS error, clear the service URL to force re-resolution
325325+ // on the next loop iteration.
326326+ sub.mu.Lock()
327327+ sub.ServiceURL = ""
328328+ sub.mu.Unlock()
329329+330330+ return fmt.Errorf("failed to dial websocket to %s: %w", u.String(), err)
331331+ }
332332+ defer con.Close()
333333+ didLog.Info("Successfully connected to Labeler firehose", "url", u.String())
334334+335335+ // 2. Define Event Callbacks
336336+ rsc := &events.RepoStreamCallbacks{
337337+ LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error {
338338+ if evt.Seq == 0 || evt.Labels == nil {
339339+ return nil
340340+ }
341341+342342+ // Update the cursor immediately
343343+ sub.mu.Lock()
344344+ sub.CurrentCursor = strconv.FormatInt(evt.Seq, 10)
345345+ sub.mu.Unlock()
346346+347347+ // Process and simplify each label event
348348+ for _, label := range evt.Labels {
349349+ select {
350350+ case m.eventCh <- LabelEvent{
351351+ SourceDid: sub.DID,
352352+ Cursor: evt.Seq,
353353+ Value: label,
354354+ }:
355355+ // Sent successfully
356356+ default:
357357+ didLog.Warn("Event channel full, dropping label event", "seq", evt.Seq, "uri", label.Uri)
358358+ }
359359+ }
360360+ return nil
361361+ },
362362+363363+ LabelInfo: func(evt *comatproto.LabelSubscribeLabels_Info) error {
364364+ if evt.Message != nil {
365365+ didLog.Info("Stream Info Message", "name", evt.Name, "message", *evt.Message)
366366+ }
367367+ return nil
368368+ },
369369+370370+ Error: func(evt *events.ErrorFrame) error {
371371+ didLog.Error("Stream processing error frame", "error_type", evt.Error, "message", evt.Message)
372372+ return fmt.Errorf("atproto stream error: %s", evt.Message)
373373+ },
374374+ }
375375+376376+ // 3. Create Scheduler and Start Processing
377377+ scheduler := parallel.NewScheduler(
378378+ MaxWorkers,
379379+ EventChannelCapacity,
380380+ fullURL,
381381+ rsc.EventHandler,
382382+ )
383383+384384+ // HandleRepoStream blocks until the connection closes or the context cancels.
385385+ return events.HandleRepoStream(sub.WorkerCtx, con, scheduler, didLog)
386386+}
+22
license
···11+MIT License
22+33+Copyright (c) 2025 Whey and contributors.
44+55+Permission is hereby granted, free of charge, to any person obtaining a copy
66+of this software and associated documentation files (the "Software"), to deal
77+in the Software without restriction, including without limitation the rights
88+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
99+copies of the Software, and to permit persons to whom the Software is
1010+furnished to do so, subject to the following conditions:
1111+1212+The above copyright notice and this permission notice shall be included in all
1313+copies or substantial portions of the Software.
1414+1515+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1616+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
1717+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1818+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1919+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2020+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121+SOFTWARE.
2222+
-81
main.go
···11-package main
22-33-import (
44- "context"
55- "fmt"
66- "log"
77- "net/http"
88- "os"
99- "time"
1010-1111- "tangled.org/whey.party/rdcs/microcosm/constellation"
1212- "tangled.org/whey.party/rdcs/microcosm/slingshot"
1313- "tangled.org/whey.party/rdcs/sticket"
1414-1515- // "github.com/bluesky-social/indigo/atproto/atclient"
1616- // comatproto "github.com/bluesky-social/indigo/api/atproto"
1717- // appbsky "github.com/bluesky-social/indigo/api/bsky"
1818- // "github.com/bluesky-social/indigo/atproto/atclient"
1919- // "github.com/bluesky-social/indigo/atproto/identity"
2020- // "github.com/bluesky-social/indigo/atproto/syntax"
2121- "github.com/bluesky-social/indigo/api/agnostic"
2222- // "github.com/bluesky-social/jetstream/pkg/models"
2323-)
2424-2525-const (
2626- JETSTREAM_URL = "ws://localhost:6008/subscribe"
2727- SPACEDUST_URL = "ws://localhost:9998/subscribe"
2828- SLINGSHOT_URL = "http://localhost:7729"
2929- CONSTELLATION_URL = "http://localhost:7728"
3030-)
3131-3232-func main() {
3333- fmt.Fprintf(os.Stdout, "RDCS started")
3434-3535- ctx := context.Background()
3636- mailbox := sticket.New()
3737- sl := slingshot.NewSlingshot(SLINGSHOT_URL)
3838- cs := constellation.NewConstellation(CONSTELLATION_URL)
3939- // spacedust is type definitions only
4040- // jetstream types is probably available from jetstream/pkg/models
4141-4242- responsewow, _ := agnostic.RepoGetRecord(ctx, sl, "", "app.bsky.feed.profile", "did:plc:44ybard66vv44zksje25o7dz", "self")
4343-4444- fmt.Fprintf(os.Stdout, responsewow.Uri)
4545-4646- http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
4747- mailbox.HandleWS(&w, r)
4848- })
4949-5050- http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
5151- fmt.Fprintf(w, "hello worldio !")
5252- clientUUID := sticket.GetUUIDFromRequest(r)
5353- hasSticket := clientUUID != ""
5454- if hasSticket {
5555- go func(targetUUID string) {
5656- // simulated heavy processing
5757- time.Sleep(2 * time.Second)
5858-5959- lateData := map[string]any{
6060- "postId": 101,
6161- "newComments": []string{
6262- "Wow great tutorial!",
6363- "I am stuck on step 1.",
6464- },
6565- }
6666-6767- success := mailbox.SendToClient(targetUUID, "post_thread_update", lateData)
6868- if success {
6969- log.Println("Successfully sent late data via Sticket")
7070- } else {
7171- log.Println("Failed to send late data (client disconnected?)")
7272- }
7373- }(clientUUID)
7474- }
7575- })
7676- http.ListenAndServe(":7152", nil)
7777-}
7878-7979-func getPostThreadV2(w http.ResponseWriter, r *http.Request) {
8080- fmt.Fprintf(w, "hello worldio !")
8181-}
···11+# Red Dwarf Server
22+33+Golang Monorepo for all server-sided Red Dwarf stuff
44+55+> [!NOTE]
66+> uh im not very confident with the current directory structure, so files and folders might move around
77+88+## Runnables
99+run all of these using `go run .` inside the respective directories
1010+1111+### `/cmd/appview`
1212+an appview, the api server that implements app.bsky.* XRPC methods.
1313+1414+development of the appview itself is on hold, but other parts of this red dwarf server repo are still being developed and will be used by the appview, probably, eventually
1515+1616+still very early in development
1717+1818+implemented routes:
1919+- `app.bsky.actor.getProfiles`
2020+- `app.bsky.actor.getProfile`
2121+- `app.bsky.notification.listNotifications` (placeholder)
2222+- `app.bsky.labeler.getServices`
2323+- `app.bsky.feed.getFeedGenerators`
2424+- `app.bsky.feed.getPosts` (post rendering is incomplete)
2525+- `app.bsky.feed.getFeed` (post rendering is incomplete)
2626+- `app.bsky.unspecced.getConfig` (placeholder)
2727+- `app.bsky.unspecced.getPostThreadV2` (mostly working! doesnt use prefered sort, not performant yet)
2828+2929+3030+### `/cmd/labelmerge`
3131+queryLabel cache. uses a different XRPC method than the default queryLabels endpoint
3232+3333+- `/xrpc/app.reddwarf.labelmerge.queryLabels`
3434+3535+the full lexicon schema is [here](/labelmerge/lex/generation/defs/app.reddwarf.labelmerge.queryLabels.json)
3636+3737+### `/cmd/backstream`
3838+experimental backfiller that kinda (but not really) conforms to the jetstream event shape. designed to be ingested by consumers expecting jetstream
3939+4040+### `/cmd/aturilist`
4141+experimental listRecords replacement. is not backfilled. uses the official jetstream go client, which means it suffers from this [bug](https://github.com/bluesky-social/jetstream/pull/45)
4242+4343+## Packages
4444+4545+### `/auth`
4646+taken from [go-bsky-feed-generator](https://github.com/jazware/go-bsky-feed-generator) but modified a bit.
4747+4848+handles all of the auth, modified to have a more lenient version to make `getFeed` work
4949+5050+### `/microcosm/*`
5151+microcosm api clients, implements constellation slingshot and spacedust
5252+5353+slingshot's api client is compatible with `github.com/bluesky-social/indigo/*` stuff, like `agnostic.RepoGetRecord` and `util.LexClient`
5454+5555+### `/labelmerge/*`
5656+labelmerge helpers, like:
5757+- a badger LRU (of unknown reliability)
5858+- labeler firehose ingester manager (still WIP, and unused)
5959+- lexicon generation files and the generated structs
6060+6161+### `/shims/*`
6262+most of Red Dwarf Server logic lives here. pulls data from upstream services like microcosm constellation and slingshot, transforms it, and spits out bsky api -like responses using the published app.bsky.* codegen from `github.com/bluesky-social/indigo/api/bsky`
6363+6464+6565+### `/sticket`
6666+unused leftover sorry
6767+6868+6969+### `/store`
7070+unused leftover sorry
7171+7272+## todo
7373+7474+- implement the many other parts of labelmerge
7575+ - labeler firehose ingester, which will be used for:
7676+ - keep caches up to date via firehose ingester
7777+ - rolling cache of the latest few hours of labels
7878+ - which will need a "was this record made in the latest few hours of labels" helper service
7979+- clean up /cmd/appview/main.go , its a mess
8080+- appview-side query caches
8181+- notification service
8282+- bookmarks service
8383+- create aturilist service
8484+- make backstream usable
8585+- create jetrelay service