Monorepo for Tangled
at master 170 lines 3.8 kB view raw
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 3package tapc 4 5import ( 6 "bytes" 7 "context" 8 "encoding/json" 9 "fmt" 10 "net/http" 11 "net/url" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/gorilla/websocket" 16 "tangled.org/core/log" 17) 18 19type Handler interface { 20 OnEvent(ctx context.Context, evt Event) error 21 OnError(ctx context.Context, err error) 22} 23 24type Client struct { 25 Url string 26 AdminPassword string 27 HTTPClient *http.Client 28} 29 30func NewClient(url, adminPassword string) Client { 31 return Client{ 32 Url: url, 33 AdminPassword: adminPassword, 34 HTTPClient: &http.Client{}, 35 } 36} 37 38func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 39 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 40 if err != nil { 41 return err 42 } 43 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 44 if err != nil { 45 return err 46 } 47 req.SetBasicAuth("admin", c.AdminPassword) 48 req.Header.Set("Content-Type", "application/json") 49 50 resp, err := c.HTTPClient.Do(req) 51 if err != nil { 52 return err 53 } 54 defer resp.Body.Close() 55 if resp.StatusCode != http.StatusOK { 56 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 57 } 58 return nil 59} 60 61func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 62 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 63 if err != nil { 64 return err 65 } 66 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 67 if err != nil { 68 return err 69 } 70 req.SetBasicAuth("admin", c.AdminPassword) 71 req.Header.Set("Content-Type", "application/json") 72 73 resp, err := c.HTTPClient.Do(req) 74 if err != nil { 75 return err 76 } 77 defer resp.Body.Close() 78 if resp.StatusCode != http.StatusOK { 79 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 80 } 81 return nil 82} 83 84func (c *Client) Connect(ctx context.Context, handler Handler) error { 85 l := log.FromContext(ctx) 86 87 u, err := url.Parse(c.Url) 88 if err != nil { 89 return err 90 } 91 if u.Scheme == "https" { 92 u.Scheme = "wss" 93 } else { 94 u.Scheme = "ws" 95 } 96 u.Path = "/channel" 97 98 // TODO: set auth on dial 99 100 url := u.String() 101 102 var backoff int 103 for { 104 select { 105 case <-ctx.Done(): 106 return ctx.Err() 107 default: 108 } 109 110 header := http.Header{ 111 "Authorization": []string{""}, 112 } 113 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 114 if err != nil { 115 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 116 time.Sleep(time.Duration(5+backoff) * time.Second) 117 backoff++ 118 119 continue 120 } 121 l.Info("connected to tap service") 122 123 l.Info("tap event subscription response", "code", res.StatusCode) 124 125 if err = c.handleConnection(ctx, conn, handler); err != nil { 126 l.Warn("tap connection failed", "err", err, "backoff", backoff) 127 } 128 } 129} 130 131func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error { 132 l := log.FromContext(ctx) 133 134 defer func() { 135 conn.Close() 136 l.Warn("closed tap conection") 137 }() 138 l.Info("established tap conection") 139 140 for { 141 select { 142 case <-ctx.Done(): 143 return ctx.Err() 144 default: 145 } 146 _, message, err := conn.ReadMessage() 147 if err != nil { 148 return err 149 } 150 151 var ev Event 152 if err := json.Unmarshal(message, &ev); err != nil { 153 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 154 continue 155 } 156 if err := handler.OnEvent(ctx, ev); err != nil { 157 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 158 continue 159 } 160 161 ack := map[string]any{ 162 "type": "ack", 163 "id": ev.ID, 164 } 165 if err := conn.WriteJSON(ack); err != nil { 166 l.Warn("failed to send ack", "err", err) 167 continue 168 } 169 } 170}