Signed-off-by: Seongmin Lee git@boltless.me
+265
Diff
round #1
+3
tapc/readme.md
+3
tapc/readme.md
+24
tapc/simpleIndexer.go
+24
tapc/simpleIndexer.go
···
1
+
package tapc
2
+
3
+
import "context"
4
+
5
+
type SimpleIndexer struct {
6
+
EventHandler func(ctx context.Context, evt Event) error
7
+
ErrorHandler func(ctx context.Context, err error)
8
+
}
9
+
10
+
var _ Handler = (*SimpleIndexer)(nil)
11
+
12
+
func (i *SimpleIndexer) OnEvent(ctx context.Context, evt Event) error {
13
+
if i.EventHandler == nil {
14
+
return nil
15
+
}
16
+
return i.EventHandler(ctx, evt)
17
+
}
18
+
19
+
func (i *SimpleIndexer) OnError(ctx context.Context, err error) {
20
+
if i.ErrorHandler == nil {
21
+
return
22
+
}
23
+
i.ErrorHandler(ctx, err)
24
+
}
+176
tapc/tap.go
+176
tapc/tap.go
···
1
+
/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
+
3
+
package tapc
4
+
5
+
import (
6
+
"bytes"
7
+
"context"
8
+
"encoding/json"
9
+
"fmt"
10
+
"net/http"
11
+
"net/url"
12
+
"time"
13
+
14
+
"github.com/bluesky-social/indigo/atproto/syntax"
15
+
"github.com/gorilla/websocket"
16
+
"tangled.org/core/log"
17
+
)
18
+
19
+
// type WebsocketOptions struct {
20
+
// maxReconnectSeconds int
21
+
// heartbeatIntervalMs int
22
+
// // onReconnectError
23
+
// }
24
+
25
+
type Handler interface {
26
+
OnEvent(ctx context.Context, evt Event) error
27
+
OnError(ctx context.Context, err error)
28
+
}
29
+
30
+
type Client struct {
31
+
Url string
32
+
AdminPassword string
33
+
HTTPClient *http.Client
34
+
}
35
+
36
+
func NewClient(url, adminPassword string) Client {
37
+
return Client{
38
+
Url: url,
39
+
AdminPassword: adminPassword,
40
+
HTTPClient: &http.Client{},
41
+
}
42
+
}
43
+
44
+
func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
45
+
body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
46
+
if err != nil {
47
+
return err
48
+
}
49
+
req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
50
+
if err != nil {
51
+
return err
52
+
}
53
+
req.SetBasicAuth("admin", c.AdminPassword)
54
+
req.Header.Set("Content-Type", "application/json")
55
+
56
+
resp, err := c.HTTPClient.Do(req)
57
+
if err != nil {
58
+
return err
59
+
}
60
+
defer resp.Body.Close()
61
+
if resp.StatusCode != http.StatusOK {
62
+
return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
63
+
}
64
+
return nil
65
+
}
66
+
67
+
func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
68
+
body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
69
+
if err != nil {
70
+
return err
71
+
}
72
+
req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
73
+
if err != nil {
74
+
return err
75
+
}
76
+
req.SetBasicAuth("admin", c.AdminPassword)
77
+
req.Header.Set("Content-Type", "application/json")
78
+
79
+
resp, err := c.HTTPClient.Do(req)
80
+
if err != nil {
81
+
return err
82
+
}
83
+
defer resp.Body.Close()
84
+
if resp.StatusCode != http.StatusOK {
85
+
return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
86
+
}
87
+
return nil
88
+
}
89
+
90
+
func (c *Client) Connect(ctx context.Context, handler Handler) error {
91
+
l := log.FromContext(ctx)
92
+
93
+
u, err := url.Parse(c.Url)
94
+
if err != nil {
95
+
return err
96
+
}
97
+
if u.Scheme == "https" {
98
+
u.Scheme = "wss"
99
+
} else {
100
+
u.Scheme = "ws"
101
+
}
102
+
u.Path = "/channel"
103
+
104
+
// TODO: set auth on dial
105
+
106
+
url := u.String()
107
+
108
+
var backoff int
109
+
for {
110
+
select {
111
+
case <-ctx.Done():
112
+
return ctx.Err()
113
+
default:
114
+
}
115
+
116
+
header := http.Header{
117
+
"Authorization": []string{""},
118
+
}
119
+
conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
120
+
if err != nil {
121
+
l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
122
+
time.Sleep(time.Duration(5+backoff) * time.Second)
123
+
backoff++
124
+
125
+
continue
126
+
}
127
+
l.Info("connected to tap service")
128
+
129
+
l.Info("tap event subscription response", "code", res.StatusCode)
130
+
131
+
if err = c.handleConnection(ctx, conn, handler); err != nil {
132
+
l.Warn("tap connection failed", "err", err, "backoff", backoff)
133
+
}
134
+
}
135
+
}
136
+
137
+
func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error {
138
+
l := log.FromContext(ctx)
139
+
140
+
defer func() {
141
+
conn.Close()
142
+
l.Warn("closed tap conection")
143
+
}()
144
+
l.Info("established tap conection")
145
+
146
+
for {
147
+
select {
148
+
case <-ctx.Done():
149
+
return ctx.Err()
150
+
default:
151
+
}
152
+
_, message, err := conn.ReadMessage()
153
+
if err != nil {
154
+
return err
155
+
}
156
+
157
+
var ev Event
158
+
if err := json.Unmarshal(message, &ev); err != nil {
159
+
handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
160
+
continue
161
+
}
162
+
if err := handler.OnEvent(ctx, ev); err != nil {
163
+
handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
164
+
continue
165
+
}
166
+
167
+
ack := map[string]any{
168
+
"type": "ack",
169
+
"id": ev.ID,
170
+
}
171
+
if err := conn.WriteJSON(ack); err != nil {
172
+
l.Warn("failed to send ack", "err", err)
173
+
continue
174
+
}
175
+
}
176
+
}
+62
tapc/types.go
+62
tapc/types.go
···
1
+
package tapc
2
+
3
+
import (
4
+
"encoding/json"
5
+
"fmt"
6
+
7
+
"github.com/bluesky-social/indigo/atproto/syntax"
8
+
)
9
+
10
+
type EventType string
11
+
12
+
const (
13
+
EvtRecord EventType = "record"
14
+
EvtIdentity EventType = "identity"
15
+
)
16
+
17
+
type Event struct {
18
+
ID int64 `json:"id"`
19
+
Type EventType `json:"type"`
20
+
Record *RecordEventData `json:"record,omitempty"`
21
+
Identity *IdentityEventData `json:"identity,omitempty"`
22
+
}
23
+
24
+
type RecordEventData struct {
25
+
Live bool `json:"live"`
26
+
Did syntax.DID `json:"did"`
27
+
Rev string `json:"rev"`
28
+
Collection syntax.NSID `json:"collection"`
29
+
Rkey syntax.RecordKey `json:"rkey"`
30
+
Action RecordAction `json:"action"`
31
+
Record json.RawMessage `json:"record,omitempty"`
32
+
CID *syntax.CID `json:"cid,omitempty"`
33
+
}
34
+
35
+
func (r *RecordEventData) AtUri() syntax.ATURI {
36
+
return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, r.Collection, r.Rkey))
37
+
}
38
+
39
+
type RecordAction string
40
+
41
+
const (
42
+
RecordCreateAction RecordAction = "create"
43
+
RecordUpdateAction RecordAction = "update"
44
+
RecordDeleteAction RecordAction = "delete"
45
+
)
46
+
47
+
type IdentityEventData struct {
48
+
DID syntax.DID `json:"did"`
49
+
Handle string `json:"handle"`
50
+
IsActive bool `json:"is_active"`
51
+
Status RepoStatus `json:"status"`
52
+
}
53
+
54
+
type RepoStatus string
55
+
56
+
const (
57
+
RepoStatusActive RepoStatus = "active"
58
+
RepoStatusTakendown RepoStatus = "takendown"
59
+
RepoStatusSuspended RepoStatus = "suspended"
60
+
RepoStatusDeactivated RepoStatus = "deactivated"
61
+
RepoStatusDeleted RepoStatus = "deleted"
62
+
)
History
5 rounds
2 comments
boltless.me
submitted
#4
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
2/3 timeout, 1/3 success
expand
collapse
expand 0 comments
pull request successfully merged
boltless.me
submitted
#3
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
expand 0 comments
boltless.me
submitted
#2
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
expand 0 comments
boltless.me
submitted
#1
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
expand 2 comments
tapc/tap.go:19-23: can be removed
good point out, I'll clean it up.
tapc/tap.go:104: do we need to do this still?
As a published go package, yes. We can harden the connection by using tap admin password. Not strictly necessary for our infra I guess..?
boltless.me
submitted
#0
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
tapc/tap.go:19-23: can be removedtapc/tap.go:104: do we need to do this still?lgtm otherwise!