Signed-off-by: Seongmin Lee git@boltless.me
+259
Diff
round #4
+3
tapc/readme.md
+3
tapc/readme.md
+24
tapc/simpleIndexer.go
+24
tapc/simpleIndexer.go
···
1
+
package tapc
2
+
3
+
import "context"
4
+
5
+
type SimpleIndexer struct {
6
+
EventHandler func(ctx context.Context, evt Event) error
7
+
ErrorHandler func(ctx context.Context, err error)
8
+
}
9
+
10
+
var _ Handler = (*SimpleIndexer)(nil)
11
+
12
+
func (i *SimpleIndexer) OnEvent(ctx context.Context, evt Event) error {
13
+
if i.EventHandler == nil {
14
+
return nil
15
+
}
16
+
return i.EventHandler(ctx, evt)
17
+
}
18
+
19
+
func (i *SimpleIndexer) OnError(ctx context.Context, err error) {
20
+
if i.ErrorHandler == nil {
21
+
return
22
+
}
23
+
i.ErrorHandler(ctx, err)
24
+
}
+170
tapc/tap.go
+170
tapc/tap.go
···
1
+
/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
+
3
+
package tapc
4
+
5
+
import (
6
+
"bytes"
7
+
"context"
8
+
"encoding/json"
9
+
"fmt"
10
+
"net/http"
11
+
"net/url"
12
+
"time"
13
+
14
+
"github.com/bluesky-social/indigo/atproto/syntax"
15
+
"github.com/gorilla/websocket"
16
+
"tangled.org/core/log"
17
+
)
18
+
19
+
type Handler interface {
20
+
OnEvent(ctx context.Context, evt Event) error
21
+
OnError(ctx context.Context, err error)
22
+
}
23
+
24
+
type Client struct {
25
+
Url string
26
+
AdminPassword string
27
+
HTTPClient *http.Client
28
+
}
29
+
30
+
func NewClient(url, adminPassword string) Client {
31
+
return Client{
32
+
Url: url,
33
+
AdminPassword: adminPassword,
34
+
HTTPClient: &http.Client{},
35
+
}
36
+
}
37
+
38
+
func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
39
+
body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
40
+
if err != nil {
41
+
return err
42
+
}
43
+
req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
44
+
if err != nil {
45
+
return err
46
+
}
47
+
req.SetBasicAuth("admin", c.AdminPassword)
48
+
req.Header.Set("Content-Type", "application/json")
49
+
50
+
resp, err := c.HTTPClient.Do(req)
51
+
if err != nil {
52
+
return err
53
+
}
54
+
defer resp.Body.Close()
55
+
if resp.StatusCode != http.StatusOK {
56
+
return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
57
+
}
58
+
return nil
59
+
}
60
+
61
+
func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
62
+
body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
63
+
if err != nil {
64
+
return err
65
+
}
66
+
req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
67
+
if err != nil {
68
+
return err
69
+
}
70
+
req.SetBasicAuth("admin", c.AdminPassword)
71
+
req.Header.Set("Content-Type", "application/json")
72
+
73
+
resp, err := c.HTTPClient.Do(req)
74
+
if err != nil {
75
+
return err
76
+
}
77
+
defer resp.Body.Close()
78
+
if resp.StatusCode != http.StatusOK {
79
+
return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
80
+
}
81
+
return nil
82
+
}
83
+
84
+
func (c *Client) Connect(ctx context.Context, handler Handler) error {
85
+
l := log.FromContext(ctx)
86
+
87
+
u, err := url.Parse(c.Url)
88
+
if err != nil {
89
+
return err
90
+
}
91
+
if u.Scheme == "https" {
92
+
u.Scheme = "wss"
93
+
} else {
94
+
u.Scheme = "ws"
95
+
}
96
+
u.Path = "/channel"
97
+
98
+
// TODO: set auth on dial
99
+
100
+
url := u.String()
101
+
102
+
var backoff int
103
+
for {
104
+
select {
105
+
case <-ctx.Done():
106
+
return ctx.Err()
107
+
default:
108
+
}
109
+
110
+
header := http.Header{
111
+
"Authorization": []string{""},
112
+
}
113
+
conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
114
+
if err != nil {
115
+
l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
116
+
time.Sleep(time.Duration(5+backoff) * time.Second)
117
+
backoff++
118
+
119
+
continue
120
+
}
121
+
l.Info("connected to tap service")
122
+
123
+
l.Info("tap event subscription response", "code", res.StatusCode)
124
+
125
+
if err = c.handleConnection(ctx, conn, handler); err != nil {
126
+
l.Warn("tap connection failed", "err", err, "backoff", backoff)
127
+
}
128
+
}
129
+
}
130
+
131
+
func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error {
132
+
l := log.FromContext(ctx)
133
+
134
+
defer func() {
135
+
conn.Close()
136
+
l.Warn("closed tap conection")
137
+
}()
138
+
l.Info("established tap conection")
139
+
140
+
for {
141
+
select {
142
+
case <-ctx.Done():
143
+
return ctx.Err()
144
+
default:
145
+
}
146
+
_, message, err := conn.ReadMessage()
147
+
if err != nil {
148
+
return err
149
+
}
150
+
151
+
var ev Event
152
+
if err := json.Unmarshal(message, &ev); err != nil {
153
+
handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
154
+
continue
155
+
}
156
+
if err := handler.OnEvent(ctx, ev); err != nil {
157
+
handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
158
+
continue
159
+
}
160
+
161
+
ack := map[string]any{
162
+
"type": "ack",
163
+
"id": ev.ID,
164
+
}
165
+
if err := conn.WriteJSON(ack); err != nil {
166
+
l.Warn("failed to send ack", "err", err)
167
+
continue
168
+
}
169
+
}
170
+
}
+62
tapc/types.go
+62
tapc/types.go
···
1
+
package tapc
2
+
3
+
import (
4
+
"encoding/json"
5
+
"fmt"
6
+
7
+
"github.com/bluesky-social/indigo/atproto/syntax"
8
+
)
9
+
10
+
type EventType string
11
+
12
+
const (
13
+
EvtRecord EventType = "record"
14
+
EvtIdentity EventType = "identity"
15
+
)
16
+
17
+
type Event struct {
18
+
ID int64 `json:"id"`
19
+
Type EventType `json:"type"`
20
+
Record *RecordEventData `json:"record,omitempty"`
21
+
Identity *IdentityEventData `json:"identity,omitempty"`
22
+
}
23
+
24
+
type RecordEventData struct {
25
+
Live bool `json:"live"`
26
+
Did syntax.DID `json:"did"`
27
+
Rev string `json:"rev"`
28
+
Collection syntax.NSID `json:"collection"`
29
+
Rkey syntax.RecordKey `json:"rkey"`
30
+
Action RecordAction `json:"action"`
31
+
Record json.RawMessage `json:"record,omitempty"`
32
+
CID *syntax.CID `json:"cid,omitempty"`
33
+
}
34
+
35
+
func (r *RecordEventData) AtUri() syntax.ATURI {
36
+
return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, r.Collection, r.Rkey))
37
+
}
38
+
39
+
type RecordAction string
40
+
41
+
const (
42
+
RecordCreateAction RecordAction = "create"
43
+
RecordUpdateAction RecordAction = "update"
44
+
RecordDeleteAction RecordAction = "delete"
45
+
)
46
+
47
+
type IdentityEventData struct {
48
+
DID syntax.DID `json:"did"`
49
+
Handle string `json:"handle"`
50
+
IsActive bool `json:"is_active"`
51
+
Status RepoStatus `json:"status"`
52
+
}
53
+
54
+
type RepoStatus string
55
+
56
+
const (
57
+
RepoStatusActive RepoStatus = "active"
58
+
RepoStatusTakendown RepoStatus = "takendown"
59
+
RepoStatusSuspended RepoStatus = "suspended"
60
+
RepoStatusDeactivated RepoStatus = "deactivated"
61
+
RepoStatusDeleted RepoStatus = "deleted"
62
+
)
History
5 rounds
2 comments
boltless.me
submitted
#4
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
2/3 timeout, 1/3 success
expand
collapse
expand 0 comments
pull request successfully merged
boltless.me
submitted
#3
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
expand 0 comments
boltless.me
submitted
#2
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
expand 0 comments
boltless.me
submitted
#1
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
expand 2 comments
tapc/tap.go:19-23: can be removed
good point out, I'll clean it up.
tapc/tap.go:104: do we need to do this still?
As a published go package, yes. We can harden the connection by using tap admin password. Not strictly necessary for our infra I guess..?
boltless.me
submitted
#0
1 commit
expand
collapse
tapc: add tap client package
Signed-off-by: Seongmin Lee <git@boltless.me>
tapc/tap.go:19-23: can be removedtapc/tap.go:104: do we need to do this still?lgtm otherwise!