+3
atkafka/atkafka.go
+3
atkafka/atkafka.go
···
28
28
29
29
type Server struct {
30
30
relayHost string
31
+
tapHost string
31
32
bootstrapServers []string
32
33
outputTopic string
33
34
ospreyCompat bool
···
47
48
type ServerArgs struct {
48
49
// network params
49
50
RelayHost string
51
+
TapHost string
50
52
PlcHost string
51
53
ApiHost string
52
54
···
113
115
114
116
s := &Server{
115
117
relayHost: args.RelayHost,
118
+
tapHost: args.TapHost,
116
119
plcClient: plcClient,
117
120
apiClient: apiClient,
118
121
bootstrapServers: args.BootstrapServers,
+268
atkafka/tap.go
+268
atkafka/tap.go
···
1
+
package atkafka
2
+
3
+
import (
4
+
"context"
5
+
"encoding/json"
6
+
"fmt"
7
+
"net/http"
8
+
"net/url"
9
+
"os"
10
+
"os/signal"
11
+
"strings"
12
+
"syscall"
13
+
"time"
14
+
15
+
"github.com/araddon/dateparse"
16
+
"github.com/bluesky-social/indigo/atproto/syntax"
17
+
"github.com/gorilla/websocket"
18
+
"golang.org/x/sync/semaphore"
19
+
)
20
+
21
+
func (s *Server) RunTapMode(ctx context.Context) error {
22
+
sema := semaphore.NewWeighted(1_000)
23
+
24
+
s.logger.Info("starting tap consumer", "tap-host", s.tapHost, "bootstrap-servers", s.bootstrapServers, "output-topic", s.outputTopic)
25
+
26
+
createCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
27
+
defer cancel()
28
+
29
+
producerLogger := s.logger.With("component", "producer")
30
+
kafProducer, err := NewProducer(createCtx, producerLogger, s.bootstrapServers, s.outputTopic,
31
+
WithEnsureTopic(true),
32
+
WithTopicPartitions(200),
33
+
)
34
+
if err != nil {
35
+
return fmt.Errorf("failed to create producer: %w", err)
36
+
}
37
+
defer kafProducer.Close()
38
+
s.producer = kafProducer
39
+
s.logger.Info("created producer")
40
+
41
+
wsDialer := websocket.DefaultDialer
42
+
u, err := url.Parse(s.tapHost)
43
+
if err != nil {
44
+
return fmt.Errorf("invalid tapHost: %w", err)
45
+
}
46
+
u.Path = "/channel"
47
+
s.logger.Info("created dialer")
48
+
49
+
wsErr := make(chan error, 1)
50
+
shutdownWs := make(chan struct{}, 1)
51
+
go func() {
52
+
logger := s.logger.With("component", "websocket")
53
+
54
+
logger.Info("subscribing to tap stream", "upstream", s.tapHost)
55
+
56
+
conn, _, err := wsDialer.Dial(u.String(), http.Header{
57
+
"User-Agent": []string{"at-kafka/0.0.0"},
58
+
})
59
+
if err != nil {
60
+
wsErr <- err
61
+
return
62
+
}
63
+
64
+
// handle events!
65
+
for {
66
+
var evt TapEvent
67
+
err := conn.ReadJSON(&evt)
68
+
if err != nil {
69
+
logger.Error("error reading json from websocket", "err", err)
70
+
break
71
+
}
72
+
73
+
if err := sema.Acquire(ctx, 1); err != nil {
74
+
logger.Error("error acquring sema", "err", err)
75
+
break
76
+
}
77
+
78
+
go func() {
79
+
defer sema.Release(1)
80
+
s.handleTapEvent(ctx, &evt)
81
+
}()
82
+
}
83
+
84
+
<-shutdownWs
85
+
86
+
wsErr <- nil
87
+
}()
88
+
s.logger.Info("created tap consumer")
89
+
90
+
signals := make(chan os.Signal, 1)
91
+
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
92
+
93
+
select {
94
+
case sig := <-signals:
95
+
s.logger.Info("shutting down on signal", "signal", sig)
96
+
case err := <-wsErr:
97
+
if err != nil {
98
+
s.logger.Error("websocket error", "err", err)
99
+
} else {
100
+
s.logger.Info("websocket shutdown unexpectedly")
101
+
}
102
+
}
103
+
104
+
close(shutdownWs)
105
+
106
+
return nil
107
+
}
108
+
109
+
func (s *Server) handleTapEvent(ctx context.Context, evt *TapEvent) error {
110
+
logger := s.logger.With("component", "handleEvent")
111
+
112
+
var collection string
113
+
var actionName string
114
+
115
+
var evtKey string
116
+
var evtsToProduce [][]byte
117
+
118
+
if evt.Record != nil {
119
+
// key events by DID
120
+
evtKey = evt.Record.Did
121
+
did := evt.Record.Did
122
+
kind := evt.Record.Action
123
+
collection = evt.Record.Collection
124
+
rkey := evt.Record.Rkey
125
+
atUri := fmt.Sprintf("at://%s/%s/%s", did, collection, rkey)
126
+
127
+
skip := false
128
+
if len(s.watchedCollections) > 0 {
129
+
skip = true
130
+
for _, watchedCollection := range s.watchedCollections {
131
+
if watchedCollection == collection || strings.HasPrefix(collection, watchedCollection+".") {
132
+
skip = false
133
+
break
134
+
}
135
+
}
136
+
} else if len(s.ignoredCollections) > 0 {
137
+
for _, ignoredCollection := range s.ignoredCollections {
138
+
if ignoredCollection == collection || strings.HasPrefix(collection, ignoredCollection+".") {
139
+
skip = true
140
+
break
141
+
}
142
+
}
143
+
}
144
+
145
+
if skip {
146
+
logger.Debug("skipping event based on collection", "collection", collection)
147
+
return nil
148
+
}
149
+
150
+
actionName = "operation#" + kind
151
+
152
+
handledEvents.WithLabelValues(actionName, collection).Inc()
153
+
154
+
// create the formatted operation
155
+
atkOp := AtKafkaOp{
156
+
Action: evt.Record.Action,
157
+
Collection: collection,
158
+
Rkey: rkey,
159
+
Uri: atUri,
160
+
Cid: evt.Record.Cid,
161
+
Path: fmt.Sprintf("%s/%s", collection, rkey),
162
+
}
163
+
164
+
if evt.Record.Record != nil {
165
+
atkOp.Record = *evt.Record.Record
166
+
}
167
+
168
+
kafkaEvt := AtKafkaEvent{
169
+
Did: did,
170
+
Operation: &atkOp,
171
+
}
172
+
173
+
if evt.Record.Record != nil {
174
+
timestamp, err := parseTimeFromRecord(collection, *evt.Record.Record, rkey)
175
+
if err != nil {
176
+
return fmt.Errorf("error getting timestamp from record: %w", err)
177
+
}
178
+
kafkaEvt.Timestamp = timestamp.Format(time.RFC3339Nano)
179
+
}
180
+
181
+
evtBytes, err := json.Marshal(&kafkaEvt)
182
+
if err != nil {
183
+
return fmt.Errorf("failed to marshal kafka event: %w", err)
184
+
}
185
+
186
+
evtsToProduce = append(evtsToProduce, evtBytes)
187
+
}
188
+
189
+
for _, evtBytes := range evtsToProduce {
190
+
if err := s.produceAsync(ctx, evtKey, evtBytes); err != nil {
191
+
return err
192
+
}
193
+
}
194
+
195
+
return nil
196
+
}
197
+
198
+
func parseTimeFromRecord(collection string, rec map[string]any, rkey string) (*time.Time, error) {
199
+
var rkeyTime time.Time
200
+
if rkey != "self" {
201
+
rt, err := syntax.ParseTID(rkey)
202
+
if err == nil {
203
+
rkeyTime = rt.Time()
204
+
}
205
+
}
206
+
207
+
switch collection {
208
+
case "app.bsky.feed.post":
209
+
cat, ok := rec["createdAt"].(string)
210
+
if ok {
211
+
t, err := dateparse.ParseAny(cat)
212
+
if err == nil {
213
+
return &t, nil
214
+
}
215
+
216
+
if rkeyTime.IsZero() {
217
+
return timePtr(time.Now()), nil
218
+
}
219
+
}
220
+
221
+
return &rkeyTime, nil
222
+
case "app.bsky.feed.repost":
223
+
cat, ok := rec["createdAt"].(string)
224
+
if ok {
225
+
t, err := dateparse.ParseAny(cat)
226
+
if err == nil {
227
+
return &t, nil
228
+
}
229
+
230
+
if rkeyTime.IsZero() {
231
+
return nil, fmt.Errorf("failed to get a useful timestamp from record")
232
+
}
233
+
}
234
+
235
+
return &rkeyTime, nil
236
+
case "app.bsky.feed.like":
237
+
cat, ok := rec["createdAt"].(string)
238
+
if ok {
239
+
t, err := dateparse.ParseAny(cat)
240
+
if err == nil {
241
+
return &t, nil
242
+
}
243
+
244
+
if rkeyTime.IsZero() {
245
+
return nil, fmt.Errorf("failed to get a useful timestamp from record")
246
+
}
247
+
}
248
+
249
+
return &rkeyTime, nil
250
+
case "app.bsky.actor.profile":
251
+
// We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one
252
+
return timePtr(time.Now()), nil
253
+
case "app.bsky.feed.generator":
254
+
if !rkeyTime.IsZero() {
255
+
return &rkeyTime, nil
256
+
}
257
+
return timePtr(time.Now()), nil
258
+
default:
259
+
if !rkeyTime.IsZero() {
260
+
return &rkeyTime, nil
261
+
}
262
+
return timePtr(time.Now()), nil
263
+
}
264
+
}
265
+
266
+
func timePtr(t time.Time) *time.Time {
267
+
return &t
268
+
}
+25
atkafka/types.go
+25
atkafka/types.go
···
65
65
AccountAge int64 `json:"accountAge"`
66
66
Profile *bsky.ActorDefs_ProfileViewDetailed `json:"profile"`
67
67
}
68
+
69
+
type TapEvent struct {
70
+
Id int64 `json:"id"`
71
+
Type string `json:"type"`
72
+
Record *TapEventRecord `json:"record,omitempty"`
73
+
Identity *TapEventIdentity `json:"identity,omitempty"`
74
+
}
75
+
76
+
type TapEventRecord struct {
77
+
Live bool `json:"live"`
78
+
Rev string `json:"rev"`
79
+
Did string `json:"did"`
80
+
Collection string `json:"collection"`
81
+
Rkey string `json:"rkey"`
82
+
Action string `json:"action"`
83
+
Cid string `json:"cid"`
84
+
Record *map[string]any `json:"record,omitempty"`
85
+
}
86
+
87
+
type TapEventIdentity struct {
88
+
Did string `json:"did"`
89
+
Handle string `json:"handle"`
90
+
IsActive bool `json:"isActive"`
91
+
Status string `json:"status"`
92
+
}
+38
cmd/atkafka/main.go
+38
cmd/atkafka/main.go
···
102
102
103
103
return nil
104
104
},
105
+
Commands: cli.Commands{
106
+
&cli.Command{
107
+
Name: "tap-mode",
108
+
Flags: []cli.Flag{
109
+
&cli.StringFlag{
110
+
Name: "tap-host",
111
+
Usage: "Tap host to subscribe to for events",
112
+
Value: "ws://localhost:2480",
113
+
EnvVars: []string{"ATKAFKA_TAP_HOST"},
114
+
},
115
+
},
116
+
Action: func(cmd *cli.Context) error {
117
+
118
+
ctx := context.Background()
119
+
120
+
telemetry.StartMetrics(cmd)
121
+
logger := telemetry.StartLogger(cmd)
122
+
123
+
s, err := atkafka.NewServer(&atkafka.ServerArgs{
124
+
TapHost: cmd.String("tap-host"),
125
+
BootstrapServers: cmd.StringSlice("bootstrap-servers"),
126
+
OutputTopic: cmd.String("output-topic"),
127
+
WatchedCollections: cmd.StringSlice("watched-collections"),
128
+
IgnoredCollections: cmd.StringSlice("ignored-collections"),
129
+
Logger: logger,
130
+
})
131
+
if err != nil {
132
+
return fmt.Errorf("failed to create new server: %w", err)
133
+
}
134
+
135
+
if err := s.RunTapMode(ctx); err != nil {
136
+
return fmt.Errorf("error running server: %w", err)
137
+
}
138
+
139
+
return nil
140
+
},
141
+
},
142
+
},
105
143
}
106
144
107
145
if err := app.Run(os.Args); err != nil {
+63
docker-compose.tap.yml
+63
docker-compose.tap.yml
···
1
+
services:
2
+
zookeeper:
3
+
image: confluentinc/cp-zookeeper:7.6.0
4
+
hostname: zookeeper
5
+
container_name: zookeeper
6
+
ports:
7
+
- "2181:2181"
8
+
environment:
9
+
ZOOKEEPER_CLIENT_PORT: 2181
10
+
ZOOKEEPER_TICK_TIME: 2000
11
+
volumes:
12
+
- zookeeper-data:/var/lib/zookeeper/data
13
+
- zookeeper-logs:/var/lib/zookeeper/log
14
+
15
+
kafka:
16
+
image: confluentinc/cp-kafka:7.6.0
17
+
hostname: kafka
18
+
container_name: kafka
19
+
depends_on:
20
+
- zookeeper
21
+
ports:
22
+
- "9092:9092"
23
+
- "9101:9101"
24
+
environment:
25
+
KAFKA_BROKER_ID: 1
26
+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
27
+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
28
+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
29
+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
30
+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
31
+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
32
+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
33
+
KAFKA_JMX_PORT: 9101
34
+
KAFKA_JMX_HOSTNAME: localhost
35
+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
36
+
volumes:
37
+
- kafka-data:/var/lib/kafka/data
38
+
39
+
atkafka:
40
+
build:
41
+
context: .
42
+
dockerfile: Dockerfile
43
+
container_name: atkafka
44
+
depends_on:
45
+
- kafka
46
+
ports:
47
+
- "2112:2112"
48
+
environment:
49
+
ATKAFKA_RELAY_HOST: "wss://bsky.network"
50
+
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
51
+
ATKAFKA_OUTPUT_TOPIC: "atproto-events"
52
+
ATKAFKA_OSPREY_COMPATIBLE: "false"
53
+
ATKAFKA_PLC_HOST: "https://plc.directory"
54
+
55
+
ATKAFKA_WATCHED_SERVICES: "*.bsky.network"
56
+
57
+
ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*"
58
+
restart: unless-stopped
59
+
60
+
volumes:
61
+
zookeeper-data:
62
+
zookeeper-logs:
63
+
kafka-data:
+2
-1
go.mod
+2
-1
go.mod
···
3
3
go 1.25.4
4
4
5
5
require (
6
+
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
6
7
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934
7
8
github.com/bluesky-social/indigo v0.0.0-20251125184450-35c1e15d2e5f
8
9
github.com/gorilla/websocket v1.5.3
···
11
12
github.com/prometheus/client_golang v1.23.2
12
13
github.com/twmb/franz-go v1.19.5
13
14
github.com/urfave/cli/v2 v2.25.7
15
+
golang.org/x/sync v0.16.0
14
16
golang.org/x/time v0.6.0
15
17
)
16
18
···
95
97
go.yaml.in/yaml/v2 v2.4.2 // indirect
96
98
golang.org/x/crypto v0.40.0 // indirect
97
99
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
98
-
golang.org/x/sync v0.16.0 // indirect
99
100
golang.org/x/sys v0.35.0 // indirect
100
101
golang.org/x/text v0.28.0 // indirect
101
102
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
+5
go.sum
+5
go.sum
···
3
3
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4=
4
4
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM=
5
5
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
6
+
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA=
7
+
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
6
8
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
7
9
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
8
10
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
···
200
202
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
201
203
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
202
204
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
205
+
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
203
206
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
204
207
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
205
208
github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
···
249
252
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
250
253
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
251
254
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
255
+
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
252
256
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
253
257
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
254
258
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
255
259
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
256
260
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
257
261
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
262
+
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
258
263
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
259
264
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
260
265
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=