Monorepo for Tangled tangled.org

knotserver: init jetstream

Changed files
+206 -5
cmd
knotserver
knotserver
+1 -1
cmd/knotserver/main.go
··· 29 29 log.Fatalf("failed to setup db: %s", err) 30 30 } 31 31 32 - mux, err := knotserver.Setup(c, db) 32 + mux, err := knotserver.Setup(ctx, c, db) 33 33 if err != nil { 34 34 log.Fatal(err) 35 35 }
+1
go.mod
··· 14 14 github.com/go-git/go-git/v5 v5.12.0 15 15 github.com/google/uuid v1.6.0 16 16 github.com/gorilla/sessions v1.4.0 17 + github.com/gorilla/websocket v1.5.1 17 18 github.com/ipfs/go-cid v0.4.1 18 19 github.com/mattn/go-sqlite3 v1.14.24 19 20 github.com/microcosm-cc/bluemonday v1.0.27
+2
go.sum
··· 103 103 github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= 104 104 github.com/gorilla/sessions v1.4.0 h1:kpIYOp/oi6MG/p5PgxApU8srsSw9tuFbt46Lt7auzqQ= 105 105 github.com/gorilla/sessions v1.4.0/go.mod h1:FLWm50oby91+hl7p/wRxDth9bWSuk0qVL2emc7lT5ik= 106 + github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= 107 + github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= 106 108 github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= 107 109 github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= 108 110 github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
+41 -4
knotserver/handler.go
··· 1 1 package knotserver 2 2 3 3 import ( 4 + "context" 5 + "encoding/json" 4 6 "fmt" 7 + "log" 5 8 "net/http" 6 9 7 10 "github.com/go-chi/chi/v5" 8 11 "github.com/icyphox/bild/knotserver/config" 9 12 "github.com/icyphox/bild/knotserver/db" 13 + "github.com/icyphox/bild/knotserver/jsclient" 10 14 ) 11 15 12 - func Setup(c *config.Config, db *db.DB) (http.Handler, error) { 16 + type Handle struct { 17 + c *config.Config 18 + db *db.DB 19 + js *jsclient.JetstreamClient 20 + } 21 + 22 + func Setup(ctx context.Context, c *config.Config, db *db.DB) (http.Handler, error) { 13 23 r := chi.NewRouter() 14 24 15 25 h := Handle{ 16 26 c: c, 17 27 db: db, 28 + } 29 + 30 + err := h.StartJetstream(ctx) 31 + if err != nil { 32 + return nil, fmt.Errorf("failed to start jetstream: %w", err) 18 33 } 19 34 20 35 r.Get("/", h.Index) ··· 55 70 return r, nil 56 71 } 57 72 58 - type Handle struct { 59 - c *config.Config 60 - db *db.DB 73 + func (h *Handle) StartJetstream(ctx context.Context) error { 74 + colections := []string{"sh.bild.publicKeys"} 75 + dids := []string{} 76 + 77 + h.js = jsclient.NewJetstreamClient(colections, dids) 78 + messages, err := h.js.ReadJetstream(ctx) 79 + if err != nil { 80 + return fmt.Errorf("failed to read from jetstream: %w", err) 81 + } 82 + 83 + go func() { 84 + for msg := range messages { 85 + var data map[string]interface{} 86 + if err := json.Unmarshal(msg, &data); err != nil { 87 + log.Printf("error unmarshaling message: %v", err) 88 + continue 89 + } 90 + 91 + if kind, ok := data["kind"].(string); ok && kind == "commit" { 92 + log.Printf("commit event: %+v", data) 93 + } 94 + } 95 + }() 96 + 97 + return nil 61 98 } 62 99 63 100 func (h *Handle) Multiplex(w http.ResponseWriter, r *http.Request) {
+161
knotserver/jsclient/jetstream.go
··· 1 + package jsclient 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log" 7 + "net/url" 8 + "sync" 9 + "time" 10 + 11 + "github.com/gorilla/websocket" 12 + ) 13 + 14 + type JetstreamClient struct { 15 + collections []string 16 + dids []string 17 + conn *websocket.Conn 18 + mu sync.RWMutex 19 + reconnectCh chan struct{} 20 + } 21 + 22 + func NewJetstreamClient(collections, dids []string) *JetstreamClient { 23 + return &JetstreamClient{ 24 + collections: collections, 25 + dids: dids, 26 + reconnectCh: make(chan struct{}, 1), 27 + } 28 + } 29 + 30 + func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL { 31 + 32 + u := url.URL{ 33 + Scheme: "wss", 34 + Host: "jetstream1.us-west.bsky.network", 35 + Path: "/subscribe", 36 + RawQuery: queryParams, 37 + } 38 + 39 + fmt.Println("URL:", u.String()) 40 + return u 41 + } 42 + 43 + // UpdateCollections updates the collections list and triggers a reconnection 44 + func (j *JetstreamClient) UpdateCollections(collections []string) { 45 + j.mu.Lock() 46 + j.collections = collections 47 + j.mu.Unlock() 48 + j.triggerReconnect() 49 + } 50 + 51 + // UpdateDids updates the DIDs list and triggers a reconnection 52 + func (j *JetstreamClient) UpdateDids(dids []string) { 53 + j.mu.Lock() 54 + j.dids = dids 55 + j.mu.Unlock() 56 + j.triggerReconnect() 57 + } 58 + 59 + func (j *JetstreamClient) triggerReconnect() { 60 + select { 61 + case j.reconnectCh <- struct{}{}: 62 + default: 63 + // Channel already has a pending reconnect 64 + } 65 + } 66 + 67 + func (j *JetstreamClient) buildQueryParams(cursor int64) string { 68 + j.mu.RLock() 69 + defer j.mu.RUnlock() 70 + 71 + var collections, dids string 72 + if len(j.collections) > 0 { 73 + collections = fmt.Sprintf("wantedCollections=%s", j.collections[0]) 74 + for _, collection := range j.collections[1:] { 75 + collections += fmt.Sprintf("&wantedCollections=%s", collection) 76 + } 77 + } 78 + if len(j.dids) > 0 { 79 + for i, did := range j.dids { 80 + if i == 0 { 81 + dids = fmt.Sprintf("wantedDids=%s", did) 82 + } else { 83 + dids += fmt.Sprintf("&wantedDids=%s", did) 84 + } 85 + } 86 + } 87 + 88 + var queryStr string 89 + if collections != "" && dids != "" { 90 + queryStr = collections + "&" + dids 91 + } else if collections != "" { 92 + queryStr = collections 93 + } else if dids != "" { 94 + queryStr = dids 95 + } 96 + 97 + return queryStr 98 + } 99 + 100 + func (j *JetstreamClient) connect(cursor int64) error { 101 + queryParams := j.buildQueryParams(cursor) 102 + u := j.buildWebsocketURL(queryParams) 103 + 104 + dialer := websocket.Dialer{ 105 + HandshakeTimeout: 10 * time.Second, 106 + } 107 + 108 + conn, _, err := dialer.Dial(u.String(), nil) 109 + if err != nil { 110 + return err 111 + } 112 + 113 + if j.conn != nil { 114 + j.conn.Close() 115 + } 116 + j.conn = conn 117 + return nil 118 + } 119 + 120 + func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) { 121 + defer close(messages) 122 + defer j.conn.Close() 123 + 124 + ticker := time.NewTicker(1 * time.Second) 125 + defer ticker.Stop() 126 + 127 + for { 128 + select { 129 + case <-ctx.Done(): 130 + return 131 + case <-j.reconnectCh: 132 + // Reconnect with new parameters 133 + // cursor := time.Now().Add(-5 * time.Second).UnixMicro() 134 + if err := j.connect(0); err != nil { 135 + log.Printf("error reconnecting to jetstream: %v", err) 136 + return 137 + } 138 + case <-ticker.C: 139 + _, message, err := j.conn.ReadMessage() 140 + if err != nil { 141 + log.Printf("error reading from websocket: %v", err) 142 + return 143 + } 144 + messages <- message 145 + } 146 + } 147 + } 148 + 149 + func (j *JetstreamClient) ReadJetstream(ctx context.Context) (chan []byte, error) { 150 + fiveSecondsAgo := time.Now().Add(-5 * time.Second).UnixMicro() 151 + 152 + if err := j.connect(fiveSecondsAgo); err != nil { 153 + log.Printf("error connecting to jetstream: %v", err) 154 + return nil, err 155 + } 156 + 157 + messages := make(chan []byte) 158 + go j.readMessages(ctx, messages) 159 + 160 + return messages, nil 161 + }