forked from
tangled.org/core
Monorepo for Tangled
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
3package tapc
4
5import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "net/url"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/gorilla/websocket"
16 "tangled.org/core/log"
17)
18
19type Handler interface {
20 OnEvent(ctx context.Context, evt Event) error
21 OnError(ctx context.Context, err error)
22}
23
24type Client struct {
25 Url string
26 AdminPassword string
27 HTTPClient *http.Client
28}
29
30func NewClient(url, adminPassword string) Client {
31 return Client{
32 Url: url,
33 AdminPassword: adminPassword,
34 HTTPClient: &http.Client{},
35 }
36}
37
38func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
39 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
40 if err != nil {
41 return err
42 }
43 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
44 if err != nil {
45 return err
46 }
47 req.SetBasicAuth("admin", c.AdminPassword)
48 req.Header.Set("Content-Type", "application/json")
49
50 resp, err := c.HTTPClient.Do(req)
51 if err != nil {
52 return err
53 }
54 defer resp.Body.Close()
55 if resp.StatusCode != http.StatusOK {
56 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
57 }
58 return nil
59}
60
61func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
62 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
63 if err != nil {
64 return err
65 }
66 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
67 if err != nil {
68 return err
69 }
70 req.SetBasicAuth("admin", c.AdminPassword)
71 req.Header.Set("Content-Type", "application/json")
72
73 resp, err := c.HTTPClient.Do(req)
74 if err != nil {
75 return err
76 }
77 defer resp.Body.Close()
78 if resp.StatusCode != http.StatusOK {
79 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
80 }
81 return nil
82}
83
84func (c *Client) Connect(ctx context.Context, handler Handler) error {
85 l := log.FromContext(ctx)
86
87 u, err := url.Parse(c.Url)
88 if err != nil {
89 return err
90 }
91 if u.Scheme == "https" {
92 u.Scheme = "wss"
93 } else {
94 u.Scheme = "ws"
95 }
96 u.Path = "/channel"
97
98 // TODO: set auth on dial
99
100 url := u.String()
101
102 var backoff int
103 for {
104 select {
105 case <-ctx.Done():
106 return ctx.Err()
107 default:
108 }
109
110 header := http.Header{
111 "Authorization": []string{""},
112 }
113 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
114 if err != nil {
115 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
116 time.Sleep(time.Duration(5+backoff) * time.Second)
117 backoff++
118
119 continue
120 }
121 l.Info("connected to tap service")
122
123 l.Info("tap event subscription response", "code", res.StatusCode)
124
125 if err = c.handleConnection(ctx, conn, handler); err != nil {
126 l.Warn("tap connection failed", "err", err, "backoff", backoff)
127 }
128 }
129}
130
131func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error {
132 l := log.FromContext(ctx)
133
134 defer func() {
135 conn.Close()
136 l.Warn("closed tap conection")
137 }()
138 l.Info("established tap conection")
139
140 for {
141 select {
142 case <-ctx.Done():
143 return ctx.Err()
144 default:
145 }
146 _, message, err := conn.ReadMessage()
147 if err != nil {
148 return err
149 }
150
151 var ev Event
152 if err := json.Unmarshal(message, &ev); err != nil {
153 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
154 continue
155 }
156 if err := handler.OnEvent(ctx, ev); err != nil {
157 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
158 continue
159 }
160
161 ack := map[string]any{
162 "type": "ack",
163 "id": ev.ID,
164 }
165 if err := conn.WriteJSON(ack); err != nil {
166 l.Warn("failed to send ack", "err", err)
167 continue
168 }
169 }
170}