Monorepo for Tangled tangled.org
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 3package tap 4 5import ( 6 "bytes" 7 "context" 8 "encoding/json" 9 "fmt" 10 "net/http" 11 "net/url" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/gorilla/websocket" 15 "tangled.org/core/log" 16) 17 18// type WebsocketOptions struct { 19// maxReconnectSeconds int 20// heartbeatIntervalMs int 21// // onReconnectError 22// } 23 24type Handler interface { 25 OnEvent(ctx context.Context, evt Event) error 26 OnError(ctx context.Context, err error) 27} 28 29type Client struct { 30 Url string 31 AdminPassword string 32 HTTPClient *http.Client 33} 34 35func NewClient(url, adminPassword string) Client { 36 return Client{ 37 Url: url, 38 AdminPassword: adminPassword, 39 HTTPClient: &http.Client{}, 40 } 41} 42 43func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 44 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 45 if err != nil { 46 return err 47 } 48 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 49 if err != nil { 50 return err 51 } 52 req.SetBasicAuth("admin", c.AdminPassword) 53 req.Header.Set("Content-Type", "application/json") 54 55 resp, err := c.HTTPClient.Do(req) 56 if err != nil { 57 return err 58 } 59 defer resp.Body.Close() 60 if resp.StatusCode != http.StatusOK { 61 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 62 } 63 return nil 64} 65 66func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 67 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 68 if err != nil { 69 return err 70 } 71 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 72 if err != nil { 73 return err 74 } 75 req.SetBasicAuth("admin", c.AdminPassword) 76 req.Header.Set("Content-Type", "application/json") 77 78 resp, err := c.HTTPClient.Do(req) 79 if err != nil { 80 return err 81 } 82 defer resp.Body.Close() 83 if resp.StatusCode != http.StatusOK { 84 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 85 } 86 return nil 87} 88 89func (c *Client) Connect(ctx context.Context, handler Handler) error { 90 l := log.FromContext(ctx) 91 92 u, err := url.Parse(c.Url) 93 if err != nil { 94 return err 95 } 96 if u.Scheme == "https" { 97 u.Scheme = "wss" 98 } else { 99 u.Scheme = "ws" 100 } 101 u.Path = "/channel" 102 103 // TODO: set auth on dial 104 105 url := u.String() 106 107 // var backoff int 108 // for { 109 // select { 110 // case <-ctx.Done(): 111 // return ctx.Err() 112 // default: 113 // } 114 // 115 // header := http.Header{ 116 // "Authorization": []string{""}, 117 // } 118 // conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 119 // if err != nil { 120 // l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 121 // time.Sleep(time.Duration(5+backoff) * time.Second) 122 // backoff++ 123 // 124 // continue 125 // } else { 126 // backoff = 0 127 // } 128 // 129 // l.Info("event subscription response", "code", res.StatusCode) 130 // } 131 132 // TODO: keep websocket connection alive 133 conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil) 134 if err != nil { 135 return err 136 } 137 defer conn.Close() 138 139 for { 140 select { 141 case <-ctx.Done(): 142 return ctx.Err() 143 default: 144 } 145 _, message, err := conn.ReadMessage() 146 if err != nil { 147 return err 148 } 149 150 var ev Event 151 if err := json.Unmarshal(message, &ev); err != nil { 152 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 153 continue 154 } 155 if err := handler.OnEvent(ctx, ev); err != nil { 156 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 157 continue 158 } 159 160 ack := map[string]any{ 161 "type": "ack", 162 "id": ev.ID, 163 } 164 if err := conn.WriteJSON(ack); err != nil { 165 l.Warn("failed to send ack", "err", err) 166 continue 167 } 168 } 169}