Monorepo for Tangled
tangled.org
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
3package tap
4
5import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "net/url"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/gorilla/websocket"
15 "tangled.org/core/log"
16)
17
18// type WebsocketOptions struct {
19// maxReconnectSeconds int
20// heartbeatIntervalMs int
21// // onReconnectError
22// }
23
24type Handler interface {
25 OnEvent(ctx context.Context, evt Event) error
26 OnError(ctx context.Context, err error)
27}
28
29type Client struct {
30 Url string
31 AdminPassword string
32 HTTPClient *http.Client
33}
34
35func NewClient(url, adminPassword string) Client {
36 return Client{
37 Url: url,
38 AdminPassword: adminPassword,
39 HTTPClient: &http.Client{},
40 }
41}
42
43func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
44 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
45 if err != nil {
46 return err
47 }
48 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
49 if err != nil {
50 return err
51 }
52 req.SetBasicAuth("admin", c.AdminPassword)
53 req.Header.Set("Content-Type", "application/json")
54
55 resp, err := c.HTTPClient.Do(req)
56 if err != nil {
57 return err
58 }
59 defer resp.Body.Close()
60 if resp.StatusCode != http.StatusOK {
61 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
62 }
63 return nil
64}
65
66func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
67 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
68 if err != nil {
69 return err
70 }
71 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
72 if err != nil {
73 return err
74 }
75 req.SetBasicAuth("admin", c.AdminPassword)
76 req.Header.Set("Content-Type", "application/json")
77
78 resp, err := c.HTTPClient.Do(req)
79 if err != nil {
80 return err
81 }
82 defer resp.Body.Close()
83 if resp.StatusCode != http.StatusOK {
84 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
85 }
86 return nil
87}
88
89func (c *Client) Connect(ctx context.Context, handler Handler) error {
90 l := log.FromContext(ctx)
91
92 u, err := url.Parse(c.Url)
93 if err != nil {
94 return err
95 }
96 if u.Scheme == "https" {
97 u.Scheme = "wss"
98 } else {
99 u.Scheme = "ws"
100 }
101 u.Path = "/channel"
102
103 // TODO: set auth on dial
104
105 url := u.String()
106
107 // var backoff int
108 // for {
109 // select {
110 // case <-ctx.Done():
111 // return ctx.Err()
112 // default:
113 // }
114 //
115 // header := http.Header{
116 // "Authorization": []string{""},
117 // }
118 // conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
119 // if err != nil {
120 // l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
121 // time.Sleep(time.Duration(5+backoff) * time.Second)
122 // backoff++
123 //
124 // continue
125 // } else {
126 // backoff = 0
127 // }
128 //
129 // l.Info("event subscription response", "code", res.StatusCode)
130 // }
131
132 // TODO: keep websocket connection alive
133 conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil)
134 if err != nil {
135 return err
136 }
137 defer conn.Close()
138
139 for {
140 select {
141 case <-ctx.Done():
142 return ctx.Err()
143 default:
144 }
145 _, message, err := conn.ReadMessage()
146 if err != nil {
147 return err
148 }
149
150 var ev Event
151 if err := json.Unmarshal(message, &ev); err != nil {
152 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
153 continue
154 }
155 if err := handler.OnEvent(ctx, ev); err != nil {
156 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
157 continue
158 }
159
160 ack := map[string]any{
161 "type": "ack",
162 "id": ev.ID,
163 }
164 if err := conn.WriteJSON(ack); err != nil {
165 l.Warn("failed to send ack", "err", err)
166 continue
167 }
168 }
169}