+1
.github/workflows/docker.yml
+1
.github/workflows/docker.yml
+62
-34
README.md
+62
-34
README.md
···
1
# at-kafka
2
3
-
A small service that receives events from the AT firehose and produces them to Kafka. Supports standard JSON outputs as well as [Osprey](https://github.com/roostorg/osprey)
4
-
formatted events.
5
6
## Usage
7
8
### Docker Compose
9
10
-
The included `docker-compose.yml` provides a complete local stack. Edit the environment variables in the file to customize:
11
12
```yaml
13
environment:
14
ATKAFKA_RELAY_HOST: "wss://bsky.network"
15
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
16
ATKAFKA_OUTPUT_TOPIC: "atproto-events"
17
-
ATKAFKA_OSPREY_COMPATIBLE: "false"
18
-
```
19
-
20
-
Then start:
21
22
-
```bash
23
-
docker compose up -d # Start services
24
```
25
26
-
### Docker Run
27
28
-
For standard mode:
29
30
-
```bash
31
-
docker run -d \
32
-
-e ATKAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
33
-
-e ATKAFKA_OUTPUT_TOPIC=atproto-events \
34
-
-p 2112:2112 \
35
-
ghcr.io/haileyok/at-kafka:latest
36
```
37
38
-
For Osprey-compatible mode:
39
40
```bash
41
-
docker run -d \
42
-
-e ATKAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
43
-
-e ATKAFKA_OUTPUT_TOPIC=atproto-events \
44
-
-e ATKAFKA_OSPREY_COMPATIBLE=true \
45
-
-p 2112:2112 \
46
-
ghcr.io/haileyok/at-kafka:latest
47
-
```
48
-
49
-
## Configuration
50
51
-
| Flag | Environment Variable | Default | Description |
52
-
|------|---------------------|---------|-------------|
53
-
| `--relay-host` | `ATKAFKA_RELAY_HOST` | `wss://bsky.network` | AT Protocol relay host to connect to |
54
-
| `--bootstrap-servers` | `ATKAFKA_BOOTSTRAP_SERVERS` | (required) | Comma-separated list of Kafka bootstrap servers |
55
-
| `--output-topic` | `ATKAFKA_OUTPUT_TOPIC` | (required) | Kafka topic to publish events to |
56
-
| `--osprey-compatible` | `ATKAFKA_OSPREY_COMPATIBLE` | `false` | Enable Osprey-compatible event format |
57
58
## Event Structure
59
60
-
### Standard Mode
61
62
Events are structured similarly to the raw AT Protocol firehose, with one key difference: **commit events are split into individual operation events**.
63
···
137
- `identity` - Identity/handle changes
138
- `info` - Informational messages
139
140
## Monitoring
141
142
The service exposes Prometheus metrics on the default metrics port.
143
144
- `atkafka_handled_events` - Total events that are received on the firehose and handled
145
- `atkafka_produced_events` - Total messages that are output on the bus
···
1
# at-kafka
2
3
+
A small service that receives events from ATProto and produces them to Kafka. Supports:
4
+
- Firehose events from relay or [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) (for network backfill)
5
+
- Ozone moderation events
6
+
- Standard JSON and [Osprey](https://github.com/roostorg/osprey)-compatible event formats
7
8
## Usage
9
10
### Docker Compose
11
12
+
Three compose files are provided:
13
+
14
+
```bash
15
+
# Firehose relay mode (default)
16
+
docker compose up -d
17
+
18
+
# Firehose tap mode (for backfill)
19
+
docker compose -f docker-compose.tap.yml up -d
20
+
21
+
# Ozone moderation events
22
+
docker compose -f docker-compose.ozone.yml up -d
23
+
```
24
+
25
+
#### Firehose Configuration
26
27
```yaml
28
environment:
29
+
# Relay/Tap connection
30
ATKAFKA_RELAY_HOST: "wss://bsky.network"
31
+
ATKAFKA_TAP_HOST: "ws://localhost:2480"
32
+
ATKAFKA_DISABLE_ACKS: false
33
+
34
+
# Kafka
35
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
36
ATKAFKA_OUTPUT_TOPIC: "atproto-events"
37
+
ATKAFKA_OSPREY_COMPATIBLE: false
38
39
+
# Filtering
40
+
ATKAFKA_WATCHED_SERVICES: "*.bsky.network"
41
+
ATKAFKA_IGNORED_SERVICES: "blacksky.app"
42
+
ATKAFKA_WATCHED_COLLECTIONS: "app.bsky.*"
43
+
ATKAFKA_IGNORED_COLLECTIONS: "fm.teal.*"
44
```
45
46
+
#### Ozone Configuration
47
48
+
```yaml
49
+
environment:
50
+
# Ozone connection
51
+
ATKAFKA_OZONE_PDS_HOST: "https://pds.example.com"
52
+
ATKAFKA_OZONE_IDENTIFIER: "your.handle"
53
+
ATKAFKA_OZONE_PASSWORD: "password"
54
+
ATKAFKA_OZONE_LABELER_DID: "did:plc:..."
55
56
+
# Kafka
57
+
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
58
+
ATKAFKA_OUTPUT_TOPIC: "ozone-events"
59
```
60
61
+
### CLI
62
63
```bash
64
+
# Firehose modes
65
+
atkafka firehose relay --bootstrap-servers localhost:9092 --output-topic events
66
+
atkafka firehose tap --tap-host ws://localhost:2480 --bootstrap-servers localhost:9092 --output-topic events
67
68
+
# Ozone mode
69
+
atkafka ozone-events \
70
+
--pds-host https://pds.example.com \
71
+
--identifier admin@example.com \
72
+
--password password \
73
+
--labeler-did did:plc:... \
74
+
--bootstrap-servers localhost:9092 \
75
+
--output-topic ozone-events
76
+
```
77
78
## Event Structure
79
80
+
### Firehose Events
81
82
Events are structured similarly to the raw AT Protocol firehose, with one key difference: **commit events are split into individual operation events**.
83
···
157
- `identity` - Identity/handle changes
158
- `info` - Informational messages
159
160
+
### Ozone Events
161
+
162
+
Ozone events are produced as-is from the `tools.ozone.moderation.queryEvents` API response. Events include moderation actions, reports, and other moderation activity. The cursor is persisted to disk and automatically resumed on restart.
163
+
164
## Monitoring
165
166
The service exposes Prometheus metrics on the default metrics port.
167
168
- `atkafka_handled_events` - Total events that are received on the firehose and handled
169
- `atkafka_produced_events` - Total messages that are output on the bus
170
+
- `atkafka_plc_requests` - Total number of PLC requests that were made, if applicable, and whether they were cached
171
+
- `atkafka_api_requests` - Total number of API requests that were made, if applicable, and whether they were cached
172
+
- `atkafka_cache_size` - The size of the PLC and API caches
173
+
- `atkafka_acks_sent` - Total acks that were sent to Tap, if applicable
+14
-3
atkafka/atkafka.go
+14
-3
atkafka/atkafka.go
···
28
29
type Server struct {
30
relayHost string
31
bootstrapServers []string
32
outputTopic string
33
ospreyCompat bool
···
42
plcClient *PlcClient
43
apiClient *ApiClient
44
logger *slog.Logger
45
}
46
47
type ServerArgs struct {
48
// network params
49
-
RelayHost string
50
-
PlcHost string
51
-
ApiHost string
52
53
// for watched and ignoed services or collections, only one list may be supplied
54
// for both services and collections, wildcards are acceptable. for example:
···
113
114
s := &Server{
115
relayHost: args.RelayHost,
116
plcClient: plcClient,
117
apiClient: apiClient,
118
bootstrapServers: args.BootstrapServers,
···
28
29
type Server struct {
30
relayHost string
31
+
tapHost string
32
+
tapWorkers int
33
+
disableAcks bool
34
bootstrapServers []string
35
outputTopic string
36
ospreyCompat bool
···
45
plcClient *PlcClient
46
apiClient *ApiClient
47
logger *slog.Logger
48
+
ws *websocket.Conn
49
+
ackQueue chan uint
50
}
51
52
type ServerArgs struct {
53
// network params
54
+
RelayHost string
55
+
TapHost string
56
+
TapWorkers int
57
+
DisableAcks bool
58
+
PlcHost string
59
+
ApiHost string
60
61
// for watched and ignoed services or collections, only one list may be supplied
62
// for both services and collections, wildcards are acceptable. for example:
···
121
122
s := &Server{
123
relayHost: args.RelayHost,
124
+
tapHost: args.TapHost,
125
+
tapWorkers: args.TapWorkers,
126
+
disableAcks: args.DisableAcks,
127
plcClient: plcClient,
128
apiClient: apiClient,
129
bootstrapServers: args.BootstrapServers,
+15
atkafka/metrics.go
+15
atkafka/metrics.go
···
30
Namespace: "atkafka",
31
Name: "api_requests",
32
}, []string{"kind", "status", "cached"})
33
+
34
+
acksSent = promauto.NewCounterVec(prometheus.CounterOpts{
35
+
Namespace: "atkafka",
36
+
Name: "acks_sent",
37
+
}, []string{"status"})
38
+
39
+
tapEvtBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
40
+
Namespace: "atkafka",
41
+
Name: "tap_event_buffer_size",
42
+
})
43
+
44
+
tapAckBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
45
+
Namespace: "atkafka",
46
+
Name: "tap_ack_event_buffer_size",
47
+
})
48
)
+337
atkafka/ozone.go
+337
atkafka/ozone.go
···
···
1
+
package atkafka
2
+
3
+
import (
4
+
"context"
5
+
"encoding/json"
6
+
"errors"
7
+
"fmt"
8
+
"log/slog"
9
+
"os"
10
+
"os/signal"
11
+
"strconv"
12
+
"strings"
13
+
"sync"
14
+
"syscall"
15
+
"time"
16
+
17
+
"github.com/bluesky-social/indigo/api/atproto"
18
+
"github.com/bluesky-social/indigo/api/ozone"
19
+
"github.com/bluesky-social/indigo/xrpc"
20
+
"github.com/twmb/franz-go/pkg/kgo"
21
+
)
22
+
23
+
type OzoneServer struct {
24
+
logger *slog.Logger
25
+
bootstrapServers []string
26
+
outputTopic string
27
+
ozoneClientArgs *OzoneClientArgs
28
+
29
+
cursorMu sync.Mutex
30
+
cursor string
31
+
}
32
+
33
+
type OzoneServerArgs struct {
34
+
BootstrapServers []string
35
+
OutputTopic string
36
+
Logger *slog.Logger
37
+
OzoneClientArgs *OzoneClientArgs
38
+
}
39
+
40
+
func NewOzoneServer(args *OzoneServerArgs) (*OzoneServer, error) {
41
+
s := OzoneServer{
42
+
logger: args.Logger,
43
+
bootstrapServers: args.BootstrapServers,
44
+
outputTopic: args.OutputTopic,
45
+
ozoneClientArgs: args.OzoneClientArgs,
46
+
}
47
+
48
+
return &s, nil
49
+
}
50
+
51
+
func (s *OzoneServer) Run(ctx context.Context) error {
52
+
logger := s.logger.With("name", "Run")
53
+
54
+
if err := s.LoadCursor(); err != nil {
55
+
logger.Error("error loading cursor", "err", err)
56
+
}
57
+
58
+
ozoneClient, err := NewOzoneClient(s.ozoneClientArgs)
59
+
if err != nil {
60
+
return err
61
+
}
62
+
63
+
logger.Info("creating producer", "bootstrap-servers", s.bootstrapServers, "output-topic", s.outputTopic)
64
+
busProducer, err := NewProducer(
65
+
ctx,
66
+
s.logger.With("component", "producer"),
67
+
s.bootstrapServers,
68
+
s.outputTopic,
69
+
WithEnsureTopic(true),
70
+
WithTopicPartitions(4),
71
+
)
72
+
if err != nil {
73
+
return fmt.Errorf("failed to create producer: %w", err)
74
+
}
75
+
defer busProducer.close()
76
+
77
+
shutdownPoll := make(chan struct{}, 1)
78
+
pollShutdown := make(chan struct{}, 1)
79
+
go func() {
80
+
ticker := time.NewTicker(1 * time.Second)
81
+
82
+
defer func() {
83
+
ticker.Stop()
84
+
close(pollShutdown)
85
+
}()
86
+
87
+
for {
88
+
select {
89
+
case <-ctx.Done():
90
+
logger.Info("shutting down poller on context cancel")
91
+
return
92
+
case <-shutdownPoll:
93
+
logger.Info("shutting down poller")
94
+
return
95
+
case <-ticker.C:
96
+
func() {
97
+
ozoneClient.mu.Lock()
98
+
defer ozoneClient.mu.Unlock()
99
+
100
+
cursor := s.GetCursor()
101
+
102
+
logger.Info("attempting to fetch next batch of events")
103
+
104
+
_, events, err := ozoneClient.GetEvents(ctx, cursor)
105
+
if err != nil {
106
+
logger.Error("error getting events", "err", err)
107
+
return
108
+
}
109
+
110
+
if len(events) == 0 {
111
+
logger.Info("no new events to emit")
112
+
return
113
+
}
114
+
115
+
var newCursor string
116
+
for _, evt := range events {
117
+
func() {
118
+
status := "error"
119
+
120
+
defer func() {
121
+
handledEvents.WithLabelValues("ozone", status).Inc()
122
+
}()
123
+
124
+
b, err := json.Marshal(evt)
125
+
if err != nil {
126
+
logger.Error("error marshaling event", "err", err)
127
+
return
128
+
}
129
+
130
+
if err := busProducer.ProduceAsync(ctx, strconv.FormatInt(evt.Id, 10), b, func(r *kgo.Record, err error) {
131
+
produceStatus := "error"
132
+
133
+
defer func() {
134
+
producedEvents.WithLabelValues(produceStatus).Inc()
135
+
}()
136
+
137
+
if err != nil {
138
+
logger.Error("error producing event", "err", err)
139
+
return
140
+
}
141
+
produceStatus = "ok"
142
+
}); err != nil {
143
+
logger.Error("error attempting to produce event", "err", err)
144
+
}
145
+
146
+
newCursor = evt.CreatedAt
147
+
148
+
status = "ok"
149
+
}()
150
+
}
151
+
152
+
if err := s.UpdateCursor(newCursor); err != nil {
153
+
logger.Error("error updating cursor", "err", err)
154
+
}
155
+
156
+
logger.Info("received events to emit", "length", len(events), "new-cursor", newCursor)
157
+
}()
158
+
}
159
+
}
160
+
}()
161
+
162
+
signals := make(chan os.Signal, 1)
163
+
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
164
+
165
+
select {
166
+
case <-ctx.Done():
167
+
logger.Warn("context cancelled")
168
+
close(shutdownPoll)
169
+
case sig := <-signals:
170
+
logger.Info("received exit signal", "signal", sig)
171
+
close(shutdownPoll)
172
+
}
173
+
174
+
select {
175
+
case <-pollShutdown:
176
+
logger.Info("shutdown successfully")
177
+
case <-time.After(5 * time.Second):
178
+
logger.Error("poller did not shut down within five seconds, forcing shutdown")
179
+
}
180
+
181
+
return nil
182
+
}
183
+
184
+
func (s *OzoneServer) UpdateCursor(cursor string) error {
185
+
s.cursorMu.Lock()
186
+
defer s.cursorMu.Unlock()
187
+
s.cursor = cursor
188
+
if err := os.WriteFile("ozone-cursor.txt", []byte(cursor), 0644); err != nil {
189
+
return fmt.Errorf("failed to write cursor: %w", err)
190
+
}
191
+
return nil
192
+
}
193
+
194
+
func (s *OzoneServer) LoadCursor() error {
195
+
logger := s.logger.With("name", "LoadCursor")
196
+
197
+
s.cursorMu.Lock()
198
+
defer s.cursorMu.Unlock()
199
+
200
+
logger.Info("attempting to load stored cursor")
201
+
202
+
b, err := os.ReadFile("ozone-cursor.txt")
203
+
if err != nil {
204
+
if errors.Is(err, os.ErrNotExist) {
205
+
logger.Info("no previous cursor found")
206
+
return nil
207
+
}
208
+
return fmt.Errorf("failed to load cursor: %w", err)
209
+
}
210
+
s.cursor = strings.TrimSpace(string(b))
211
+
logger.Info("loaded old cursor", "cursor", s.cursor)
212
+
return nil
213
+
}
214
+
215
+
func (s *OzoneServer) GetCursor() string {
216
+
s.cursorMu.Lock()
217
+
defer s.cursorMu.Unlock()
218
+
return s.cursor
219
+
}
220
+
221
+
type OzoneClient struct {
222
+
xrpcc *xrpc.Client
223
+
mu sync.Mutex
224
+
logger *slog.Logger
225
+
}
226
+
227
+
type OzoneClientArgs struct {
228
+
OzonePdsHost string
229
+
OzoneIdentifier string
230
+
OzonePassword string
231
+
OzoneLabelerDid string
232
+
Logger *slog.Logger
233
+
}
234
+
235
+
func NewOzoneClient(args *OzoneClientArgs) (*OzoneClient, error) {
236
+
xrpcc := &xrpc.Client{
237
+
Host: args.OzonePdsHost,
238
+
Headers: map[string]string{
239
+
"atproto-proxy": fmt.Sprintf("%s#atproto_labeler", args.OzoneLabelerDid),
240
+
},
241
+
}
242
+
243
+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
244
+
defer cancel()
245
+
246
+
resp, err := atproto.ServerCreateSession(ctx, xrpcc, &atproto.ServerCreateSession_Input{
247
+
Identifier: args.OzoneIdentifier,
248
+
Password: args.OzonePassword,
249
+
})
250
+
if err != nil {
251
+
return nil, err
252
+
}
253
+
254
+
xrpcc.Auth = &xrpc.AuthInfo{
255
+
Handle: resp.Handle,
256
+
Did: resp.Did,
257
+
AccessJwt: resp.AccessJwt,
258
+
RefreshJwt: resp.RefreshJwt,
259
+
}
260
+
261
+
oc := OzoneClient{
262
+
xrpcc: xrpcc,
263
+
logger: args.Logger,
264
+
}
265
+
266
+
go func() {
267
+
logger := args.Logger.With("component", "refresh-routine")
268
+
269
+
ticker := time.NewTicker(1 * time.Hour)
270
+
271
+
defer func() {
272
+
ticker.Stop()
273
+
}()
274
+
275
+
for range ticker.C {
276
+
func() {
277
+
logger.Info("attempting to refresh session")
278
+
279
+
oc.mu.Lock()
280
+
defer oc.mu.Unlock()
281
+
282
+
xrpcc.Auth.AccessJwt = xrpcc.Auth.RefreshJwt
283
+
284
+
refreshCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
285
+
defer cancel()
286
+
287
+
resp, err := atproto.ServerRefreshSession(refreshCtx, xrpcc)
288
+
if err != nil {
289
+
logger.Error("error refreshing session", "err", err)
290
+
return
291
+
}
292
+
293
+
xrpcc.Auth.AccessJwt = resp.AccessJwt
294
+
xrpcc.Auth.RefreshJwt = resp.RefreshJwt
295
+
296
+
logger.Info("session refreshed successfully")
297
+
}()
298
+
}
299
+
}()
300
+
301
+
return &oc, nil
302
+
}
303
+
304
+
func (oc *OzoneClient) GetEvents(ctx context.Context, before string) (*string, []*ozone.ModerationDefs_ModEventView, error) {
305
+
resp, err := ozone.ModerationQueryEvents(
306
+
ctx,
307
+
oc.xrpcc,
308
+
nil, // labels array
309
+
nil, // tags array
310
+
"", // age assurance state
311
+
"", // batch id
312
+
nil, // collections array
313
+
"", // comment filter
314
+
before, // created before
315
+
"", // created after
316
+
"", // created by
317
+
"", // cursor
318
+
false, // has comment
319
+
true, // include all records
320
+
100, // limit
321
+
nil, // mod tool array
322
+
nil, // policies array
323
+
nil, // removed labels array
324
+
nil, // removed tags array
325
+
nil, // report types array
326
+
"asc", // sort direction
327
+
"", // subject
328
+
"", // subject type
329
+
nil, // types array
330
+
false, // withStrike bool
331
+
)
332
+
if err != nil {
333
+
return nil, nil, fmt.Errorf("error querying ozone events: %w", err)
334
+
}
335
+
336
+
return resp.Cursor, resp.Events, nil
337
+
}
+327
atkafka/tap.go
+327
atkafka/tap.go
···
···
1
+
package atkafka
2
+
3
+
import (
4
+
"context"
5
+
"encoding/json"
6
+
"fmt"
7
+
"net/http"
8
+
"net/url"
9
+
"os"
10
+
"os/signal"
11
+
"strings"
12
+
"syscall"
13
+
"time"
14
+
15
+
"github.com/araddon/dateparse"
16
+
"github.com/bluesky-social/indigo/atproto/syntax"
17
+
"github.com/gorilla/websocket"
18
+
"github.com/twmb/franz-go/pkg/kgo"
19
+
)
20
+
21
+
func (s *Server) RunTapMode(ctx context.Context) error {
22
+
s.logger.Info("starting tap consumer", "tap-host", s.tapHost, "bootstrap-servers", s.bootstrapServers, "output-topic", s.outputTopic)
23
+
24
+
createCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
25
+
defer cancel()
26
+
27
+
producerLogger := s.logger.With("component", "producer")
28
+
kafProducer, err := NewProducer(createCtx, producerLogger, s.bootstrapServers, s.outputTopic,
29
+
WithEnsureTopic(true),
30
+
WithTopicPartitions(200),
31
+
)
32
+
if err != nil {
33
+
return fmt.Errorf("failed to create producer: %w", err)
34
+
}
35
+
defer kafProducer.Close()
36
+
s.producer = kafProducer
37
+
s.logger.Info("created producer")
38
+
39
+
wsDialer := websocket.DefaultDialer
40
+
u, err := url.Parse(s.tapHost)
41
+
if err != nil {
42
+
return fmt.Errorf("invalid tapHost: %w", err)
43
+
}
44
+
u.Path = "/channel"
45
+
s.logger.Info("created dialer")
46
+
47
+
evtQueue := make(chan TapEvent, 10_000)
48
+
for range s.tapWorkers {
49
+
go func() {
50
+
for evt := range evtQueue {
51
+
s.handleTapEvent(ctx, &evt)
52
+
}
53
+
}()
54
+
}
55
+
56
+
ackQueue := make(chan uint, 10_000)
57
+
go func() {
58
+
for id := range ackQueue {
59
+
func() {
60
+
status := "ok"
61
+
defer func() {
62
+
acksSent.WithLabelValues(status).Inc()
63
+
}()
64
+
65
+
if !s.disableAcks {
66
+
if err := s.ws.WriteJSON(TapAck{
67
+
Type: "ack",
68
+
Id: id,
69
+
}); err != nil {
70
+
s.logger.Error("error sending ack", "err", err)
71
+
status = "error"
72
+
}
73
+
}
74
+
}()
75
+
}
76
+
}()
77
+
s.ackQueue = ackQueue
78
+
79
+
bufferSizeTicker := time.NewTicker(5 * time.Second)
80
+
go func() {
81
+
for range bufferSizeTicker.C {
82
+
tapEvtBufferSize.Set(float64(len(evtQueue)))
83
+
tapAckBufferSize.Set(float64(len(ackQueue)))
84
+
}
85
+
}()
86
+
87
+
wsErr := make(chan error, 1)
88
+
go func() {
89
+
logger := s.logger.With("component", "websocket")
90
+
91
+
logger.Info("subscribing to tap stream", "upstream", s.tapHost)
92
+
93
+
conn, _, err := wsDialer.Dial(u.String(), http.Header{
94
+
"User-Agent": []string{"at-kafka/0.0.0"},
95
+
})
96
+
if err != nil {
97
+
wsErr <- err
98
+
return
99
+
}
100
+
s.ws = conn
101
+
defer conn.Close()
102
+
103
+
for {
104
+
var evt TapEvent
105
+
err := conn.ReadJSON(&evt)
106
+
if err != nil {
107
+
logger.Error("error reading json from websocket", "err", err)
108
+
wsErr <- err
109
+
return
110
+
}
111
+
112
+
select {
113
+
case evtQueue <- evt:
114
+
case <-ctx.Done():
115
+
wsErr <- ctx.Err()
116
+
return
117
+
}
118
+
}
119
+
}()
120
+
121
+
s.logger.Info("created tap consumer")
122
+
123
+
signals := make(chan os.Signal, 1)
124
+
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
125
+
126
+
select {
127
+
case sig := <-signals:
128
+
s.logger.Info("shutting down on signal", "signal", sig)
129
+
case err := <-wsErr:
130
+
if err != nil {
131
+
s.logger.Error("websocket error", "err", err)
132
+
} else {
133
+
s.logger.Info("websocket shutdown unexpectedly")
134
+
}
135
+
}
136
+
137
+
bufferSizeTicker.Stop()
138
+
close(evtQueue)
139
+
close(ackQueue)
140
+
141
+
return nil
142
+
}
143
+
144
+
func (s *Server) handleTapEvent(ctx context.Context, evt *TapEvent) error {
145
+
logger := s.logger.With("component", "handleEvent")
146
+
147
+
defer func() {
148
+
s.ackEvent(ctx, evt.Id)
149
+
}()
150
+
151
+
if evt.Record != nil {
152
+
did := evt.Record.Did
153
+
kind := evt.Record.Action
154
+
collection := evt.Record.Collection
155
+
rkey := evt.Record.Rkey
156
+
atUri := fmt.Sprintf("at://%s/%s/%s", did, collection, rkey)
157
+
158
+
skip := false
159
+
if len(s.watchedCollections) > 0 {
160
+
skip = true
161
+
for _, watchedCollection := range s.watchedCollections {
162
+
if watchedCollection == collection || strings.HasPrefix(collection, watchedCollection+".") {
163
+
skip = false
164
+
break
165
+
}
166
+
}
167
+
} else if len(s.ignoredCollections) > 0 {
168
+
for _, ignoredCollection := range s.ignoredCollections {
169
+
if ignoredCollection == collection || strings.HasPrefix(collection, ignoredCollection+".") {
170
+
skip = true
171
+
break
172
+
}
173
+
}
174
+
}
175
+
176
+
if skip {
177
+
logger.Debug("skipping event based on collection", "collection", collection)
178
+
return nil
179
+
}
180
+
181
+
actionName := "operation#" + kind
182
+
183
+
handledEvents.WithLabelValues(actionName, collection).Inc()
184
+
185
+
// create the formatted operation
186
+
atkOp := AtKafkaOp{
187
+
Action: evt.Record.Action,
188
+
Collection: collection,
189
+
Rkey: rkey,
190
+
Uri: atUri,
191
+
Cid: evt.Record.Cid,
192
+
Path: fmt.Sprintf("%s/%s", collection, rkey),
193
+
}
194
+
195
+
if evt.Record.Record != nil {
196
+
atkOp.Record = *evt.Record.Record
197
+
}
198
+
199
+
kafkaEvt := AtKafkaEvent{
200
+
Did: did,
201
+
Operation: &atkOp,
202
+
}
203
+
204
+
if evt.Record.Record != nil {
205
+
timestamp, err := parseTimeFromRecord(collection, *evt.Record.Record, rkey)
206
+
if err != nil {
207
+
return fmt.Errorf("error getting timestamp from record: %w", err)
208
+
}
209
+
kafkaEvt.Timestamp = timestamp.Format(time.RFC3339Nano)
210
+
}
211
+
212
+
evtBytes, err := json.Marshal(&kafkaEvt)
213
+
if err != nil {
214
+
return fmt.Errorf("failed to marshal kafka event: %w", err)
215
+
}
216
+
217
+
if err := s.produceAsyncTap(ctx, did, evtBytes); err != nil {
218
+
return err
219
+
}
220
+
}
221
+
222
+
return nil
223
+
}
224
+
225
+
func (s *Server) produceAsyncTap(ctx context.Context, key string, msg []byte) error {
226
+
logger := s.logger.With("name", "produceAsyncTap", "key", key)
227
+
callback := func(r *kgo.Record, err error) {
228
+
status := "ok"
229
+
defer func() {
230
+
producedEvents.WithLabelValues(status).Inc()
231
+
}()
232
+
if err != nil {
233
+
logger.Error("error producing message", "err", err)
234
+
status = "error"
235
+
}
236
+
}
237
+
238
+
if err := s.producer.ProduceAsync(ctx, key, msg, callback); err != nil {
239
+
return fmt.Errorf("failed to produce message: %w", err)
240
+
}
241
+
242
+
return nil
243
+
}
244
+
245
+
func (s *Server) ackEvent(ctx context.Context, id uint) {
246
+
ackCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
247
+
defer cancel()
248
+
249
+
select {
250
+
case s.ackQueue <- id:
251
+
case <-ackCtx.Done():
252
+
s.logger.Warn("dropped ack event", "id", id, "err", ackCtx.Err())
253
+
acksSent.WithLabelValues("dropped").Inc()
254
+
}
255
+
}
256
+
257
+
func parseTimeFromRecord(collection string, rec map[string]any, rkey string) (*time.Time, error) {
258
+
var rkeyTime time.Time
259
+
if rkey != "self" {
260
+
rt, err := syntax.ParseTID(rkey)
261
+
if err == nil {
262
+
rkeyTime = rt.Time()
263
+
}
264
+
}
265
+
266
+
switch collection {
267
+
case "app.bsky.feed.post":
268
+
cat, ok := rec["createdAt"].(string)
269
+
if ok {
270
+
t, err := dateparse.ParseAny(cat)
271
+
if err == nil {
272
+
return &t, nil
273
+
}
274
+
275
+
if rkeyTime.IsZero() {
276
+
return timePtr(time.Now()), nil
277
+
}
278
+
}
279
+
280
+
return &rkeyTime, nil
281
+
case "app.bsky.feed.repost":
282
+
cat, ok := rec["createdAt"].(string)
283
+
if ok {
284
+
t, err := dateparse.ParseAny(cat)
285
+
if err == nil {
286
+
return &t, nil
287
+
}
288
+
289
+
if rkeyTime.IsZero() {
290
+
return nil, fmt.Errorf("failed to get a useful timestamp from record")
291
+
}
292
+
}
293
+
294
+
return &rkeyTime, nil
295
+
case "app.bsky.feed.like":
296
+
cat, ok := rec["createdAt"].(string)
297
+
if ok {
298
+
t, err := dateparse.ParseAny(cat)
299
+
if err == nil {
300
+
return &t, nil
301
+
}
302
+
303
+
if rkeyTime.IsZero() {
304
+
return nil, fmt.Errorf("failed to get a useful timestamp from record")
305
+
}
306
+
}
307
+
308
+
return &rkeyTime, nil
309
+
case "app.bsky.actor.profile":
310
+
// We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one
311
+
return timePtr(time.Now()), nil
312
+
case "app.bsky.feed.generator":
313
+
if !rkeyTime.IsZero() {
314
+
return &rkeyTime, nil
315
+
}
316
+
return timePtr(time.Now()), nil
317
+
default:
318
+
if !rkeyTime.IsZero() {
319
+
return &rkeyTime, nil
320
+
}
321
+
return timePtr(time.Now()), nil
322
+
}
323
+
}
324
+
325
+
func timePtr(t time.Time) *time.Time {
326
+
return &t
327
+
}
+30
atkafka/types.go
+30
atkafka/types.go
···
65
AccountAge int64 `json:"accountAge"`
66
Profile *bsky.ActorDefs_ProfileViewDetailed `json:"profile"`
67
}
68
+
69
+
type TapEvent struct {
70
+
Id uint `json:"id"`
71
+
Type string `json:"type"`
72
+
Record *TapEventRecord `json:"record,omitempty"`
73
+
Identity *TapEventIdentity `json:"identity,omitempty"`
74
+
}
75
+
76
+
type TapEventRecord struct {
77
+
Live bool `json:"live"`
78
+
Rev string `json:"rev"`
79
+
Did string `json:"did"`
80
+
Collection string `json:"collection"`
81
+
Rkey string `json:"rkey"`
82
+
Action string `json:"action"`
83
+
Cid string `json:"cid"`
84
+
Record *map[string]any `json:"record,omitempty"`
85
+
}
86
+
87
+
type TapEventIdentity struct {
88
+
Did string `json:"did"`
89
+
Handle string `json:"handle"`
90
+
IsActive bool `json:"isActive"`
91
+
Status string `json:"status"`
92
+
}
93
+
94
+
type TapAck struct {
95
+
Type string `json:"type"`
96
+
Id uint `json:"id"`
97
+
}
+499
atkafka-grafana.json
+499
atkafka-grafana.json
···
···
1
+
{
2
+
"annotations": {
3
+
"list": [
4
+
{
5
+
"builtIn": 1,
6
+
"datasource": {
7
+
"type": "grafana",
8
+
"uid": "-- Grafana --"
9
+
},
10
+
"enable": true,
11
+
"hide": true,
12
+
"iconColor": "rgba(0, 211, 255, 1)",
13
+
"name": "Annotations & Alerts",
14
+
"type": "dashboard"
15
+
}
16
+
]
17
+
},
18
+
"editable": true,
19
+
"fiscalYearStartMonth": 0,
20
+
"graphTooltip": 0,
21
+
"id": 0,
22
+
"links": [],
23
+
"panels": [
24
+
{
25
+
"datasource": {
26
+
"type": "prometheus",
27
+
"uid": "ef7prazof3yf4d"
28
+
},
29
+
"fieldConfig": {
30
+
"defaults": {
31
+
"color": {
32
+
"mode": "palette-classic"
33
+
},
34
+
"custom": {
35
+
"axisBorderShow": false,
36
+
"axisCenteredZero": false,
37
+
"axisColorMode": "text",
38
+
"axisLabel": "",
39
+
"axisPlacement": "auto",
40
+
"barAlignment": 0,
41
+
"barWidthFactor": 0.6,
42
+
"drawStyle": "line",
43
+
"fillOpacity": 0,
44
+
"gradientMode": "none",
45
+
"hideFrom": {
46
+
"legend": false,
47
+
"tooltip": false,
48
+
"viz": false
49
+
},
50
+
"insertNulls": 1800000,
51
+
"lineInterpolation": "linear",
52
+
"lineWidth": 1,
53
+
"pointSize": 5,
54
+
"scaleDistribution": {
55
+
"type": "linear"
56
+
},
57
+
"showPoints": "auto",
58
+
"showValues": false,
59
+
"spanNulls": false,
60
+
"stacking": {
61
+
"group": "A",
62
+
"mode": "none"
63
+
},
64
+
"thresholdsStyle": {
65
+
"mode": "off"
66
+
}
67
+
},
68
+
"mappings": [],
69
+
"thresholds": {
70
+
"mode": "absolute",
71
+
"steps": [
72
+
{
73
+
"color": "green",
74
+
"value": 0
75
+
},
76
+
{
77
+
"color": "red",
78
+
"value": 80
79
+
}
80
+
]
81
+
}
82
+
},
83
+
"overrides": []
84
+
},
85
+
"gridPos": {
86
+
"h": 8,
87
+
"w": 12,
88
+
"x": 0,
89
+
"y": 0
90
+
},
91
+
"id": 2,
92
+
"options": {
93
+
"legend": {
94
+
"calcs": [],
95
+
"displayMode": "table",
96
+
"placement": "right",
97
+
"showLegend": true
98
+
},
99
+
"tooltip": {
100
+
"hideZeros": false,
101
+
"mode": "single",
102
+
"sort": "none"
103
+
}
104
+
},
105
+
"pluginVersion": "12.3.1",
106
+
"targets": [
107
+
{
108
+
"editorMode": "builder",
109
+
"expr": "sum by(collection, action_name) (rate(atkafka_handled_events{instance=\"$instance\"}[$__rate_interval]))",
110
+
"legendFormat": "{{collection}} ({{action_name}})",
111
+
"range": true,
112
+
"refId": "A"
113
+
}
114
+
],
115
+
"title": "Firehose Received Events",
116
+
"type": "timeseries"
117
+
},
118
+
{
119
+
"datasource": {
120
+
"type": "prometheus",
121
+
"uid": "ef7prazof3yf4d"
122
+
},
123
+
"fieldConfig": {
124
+
"defaults": {
125
+
"color": {
126
+
"mode": "palette-classic"
127
+
},
128
+
"custom": {
129
+
"axisBorderShow": false,
130
+
"axisCenteredZero": false,
131
+
"axisColorMode": "text",
132
+
"axisLabel": "",
133
+
"axisPlacement": "auto",
134
+
"barAlignment": 0,
135
+
"barWidthFactor": 0.6,
136
+
"drawStyle": "line",
137
+
"fillOpacity": 0,
138
+
"gradientMode": "none",
139
+
"hideFrom": {
140
+
"legend": false,
141
+
"tooltip": false,
142
+
"viz": false
143
+
},
144
+
"insertNulls": 1800000,
145
+
"lineInterpolation": "linear",
146
+
"lineWidth": 1,
147
+
"pointSize": 5,
148
+
"scaleDistribution": {
149
+
"type": "linear"
150
+
},
151
+
"showPoints": "auto",
152
+
"showValues": false,
153
+
"spanNulls": false,
154
+
"stacking": {
155
+
"group": "A",
156
+
"mode": "none"
157
+
},
158
+
"thresholdsStyle": {
159
+
"mode": "off"
160
+
}
161
+
},
162
+
"mappings": [],
163
+
"thresholds": {
164
+
"mode": "absolute",
165
+
"steps": [
166
+
{
167
+
"color": "green",
168
+
"value": 0
169
+
},
170
+
{
171
+
"color": "red",
172
+
"value": 80
173
+
}
174
+
]
175
+
}
176
+
},
177
+
"overrides": []
178
+
},
179
+
"gridPos": {
180
+
"h": 8,
181
+
"w": 12,
182
+
"x": 12,
183
+
"y": 0
184
+
},
185
+
"id": 1,
186
+
"options": {
187
+
"legend": {
188
+
"calcs": [],
189
+
"displayMode": "table",
190
+
"placement": "right",
191
+
"showLegend": true
192
+
},
193
+
"tooltip": {
194
+
"hideZeros": false,
195
+
"mode": "single",
196
+
"sort": "none"
197
+
}
198
+
},
199
+
"pluginVersion": "12.3.1",
200
+
"targets": [
201
+
{
202
+
"editorMode": "builder",
203
+
"expr": "sum by(status) (rate(atkafka_produced_events{instance=\"$instance\"}[$__rate_interval]))",
204
+
"legendFormat": "__auto",
205
+
"range": true,
206
+
"refId": "A"
207
+
}
208
+
],
209
+
"title": "Produced Events",
210
+
"type": "timeseries"
211
+
},
212
+
{
213
+
"datasource": {
214
+
"type": "prometheus",
215
+
"uid": "ef7prazof3yf4d"
216
+
},
217
+
"fieldConfig": {
218
+
"defaults": {
219
+
"color": {
220
+
"mode": "palette-classic"
221
+
},
222
+
"custom": {
223
+
"axisBorderShow": false,
224
+
"axisCenteredZero": false,
225
+
"axisColorMode": "text",
226
+
"axisLabel": "",
227
+
"axisPlacement": "auto",
228
+
"barAlignment": 0,
229
+
"barWidthFactor": 0.6,
230
+
"drawStyle": "line",
231
+
"fillOpacity": 0,
232
+
"gradientMode": "none",
233
+
"hideFrom": {
234
+
"legend": false,
235
+
"tooltip": false,
236
+
"viz": false
237
+
},
238
+
"insertNulls": 1800000,
239
+
"lineInterpolation": "linear",
240
+
"lineWidth": 1,
241
+
"pointSize": 5,
242
+
"scaleDistribution": {
243
+
"type": "linear"
244
+
},
245
+
"showPoints": "auto",
246
+
"showValues": false,
247
+
"spanNulls": false,
248
+
"stacking": {
249
+
"group": "A",
250
+
"mode": "none"
251
+
},
252
+
"thresholdsStyle": {
253
+
"mode": "off"
254
+
}
255
+
},
256
+
"mappings": [],
257
+
"thresholds": {
258
+
"mode": "absolute",
259
+
"steps": [
260
+
{
261
+
"color": "green",
262
+
"value": 0
263
+
},
264
+
{
265
+
"color": "red",
266
+
"value": 80
267
+
}
268
+
]
269
+
}
270
+
},
271
+
"overrides": []
272
+
},
273
+
"gridPos": {
274
+
"h": 8,
275
+
"w": 12,
276
+
"x": 0,
277
+
"y": 8
278
+
},
279
+
"id": 3,
280
+
"options": {
281
+
"legend": {
282
+
"calcs": [],
283
+
"displayMode": "table",
284
+
"placement": "right",
285
+
"showLegend": true
286
+
},
287
+
"tooltip": {
288
+
"hideZeros": false,
289
+
"mode": "single",
290
+
"sort": "none"
291
+
}
292
+
},
293
+
"pluginVersion": "12.3.1",
294
+
"targets": [
295
+
{
296
+
"editorMode": "builder",
297
+
"expr": "sum by(status, cached, kind) (rate(atkafka_api_requests{instance=\"$instance\"}[$__rate_interval]))",
298
+
"legendFormat": "{{kind}} cached={{cached}} status={{status}}",
299
+
"range": true,
300
+
"refId": "A"
301
+
}
302
+
],
303
+
"title": "API Requests",
304
+
"type": "timeseries"
305
+
},
306
+
{
307
+
"datasource": {
308
+
"type": "prometheus",
309
+
"uid": "ef7prazof3yf4d"
310
+
},
311
+
"fieldConfig": {
312
+
"defaults": {
313
+
"color": {
314
+
"mode": "palette-classic"
315
+
},
316
+
"custom": {
317
+
"axisBorderShow": false,
318
+
"axisCenteredZero": false,
319
+
"axisColorMode": "text",
320
+
"axisLabel": "",
321
+
"axisPlacement": "auto",
322
+
"barAlignment": 0,
323
+
"barWidthFactor": 0.6,
324
+
"drawStyle": "line",
325
+
"fillOpacity": 0,
326
+
"gradientMode": "none",
327
+
"hideFrom": {
328
+
"legend": false,
329
+
"tooltip": false,
330
+
"viz": false
331
+
},
332
+
"insertNulls": 3600000,
333
+
"lineInterpolation": "linear",
334
+
"lineWidth": 1,
335
+
"pointSize": 5,
336
+
"scaleDistribution": {
337
+
"type": "linear"
338
+
},
339
+
"showPoints": "auto",
340
+
"showValues": false,
341
+
"spanNulls": false,
342
+
"stacking": {
343
+
"group": "A",
344
+
"mode": "none"
345
+
},
346
+
"thresholdsStyle": {
347
+
"mode": "off"
348
+
}
349
+
},
350
+
"mappings": [],
351
+
"thresholds": {
352
+
"mode": "absolute",
353
+
"steps": [
354
+
{
355
+
"color": "green",
356
+
"value": 0
357
+
},
358
+
{
359
+
"color": "red",
360
+
"value": 80
361
+
}
362
+
]
363
+
}
364
+
},
365
+
"overrides": []
366
+
},
367
+
"gridPos": {
368
+
"h": 8,
369
+
"w": 12,
370
+
"x": 12,
371
+
"y": 8
372
+
},
373
+
"id": 4,
374
+
"options": {
375
+
"legend": {
376
+
"calcs": [],
377
+
"displayMode": "table",
378
+
"placement": "right",
379
+
"showLegend": true
380
+
},
381
+
"tooltip": {
382
+
"hideZeros": false,
383
+
"mode": "single",
384
+
"sort": "none"
385
+
}
386
+
},
387
+
"pluginVersion": "12.3.1",
388
+
"targets": [
389
+
{
390
+
"editorMode": "builder",
391
+
"expr": "sum by(kind, status, cached) (rate(atkafka_plc_requests{instance=\"$instance\"}[$__rate_interval]))",
392
+
"legendFormat": "{{kind}} cached={{cached}} status={{status}}",
393
+
"range": true,
394
+
"refId": "A"
395
+
}
396
+
],
397
+
"title": "New panel",
398
+
"type": "timeseries"
399
+
},
400
+
{
401
+
"datasource": {
402
+
"type": "prometheus",
403
+
"uid": "ef7prazof3yf4d"
404
+
},
405
+
"fieldConfig": {
406
+
"defaults": {
407
+
"color": {
408
+
"mode": "thresholds"
409
+
},
410
+
"mappings": [],
411
+
"thresholds": {
412
+
"mode": "absolute",
413
+
"steps": [
414
+
{
415
+
"color": "green",
416
+
"value": 0
417
+
}
418
+
]
419
+
}
420
+
},
421
+
"overrides": []
422
+
},
423
+
"gridPos": {
424
+
"h": 7,
425
+
"w": 6,
426
+
"x": 0,
427
+
"y": 16
428
+
},
429
+
"id": 5,
430
+
"options": {
431
+
"colorMode": "value",
432
+
"graphMode": "none",
433
+
"justifyMode": "center",
434
+
"orientation": "auto",
435
+
"percentChangeColorMode": "standard",
436
+
"reduceOptions": {
437
+
"calcs": ["lastNotNull"],
438
+
"fields": "",
439
+
"values": false
440
+
},
441
+
"showPercentChange": false,
442
+
"text": {
443
+
"titleSize": 40,
444
+
"valueSize": 100
445
+
},
446
+
"textMode": "auto",
447
+
"wideLayout": true
448
+
},
449
+
"pluginVersion": "12.3.1",
450
+
"targets": [
451
+
{
452
+
"editorMode": "builder",
453
+
"expr": "atkafka_cache_size{instance=\"$instance\"}",
454
+
"legendFormat": "{{kind}}",
455
+
"range": true,
456
+
"refId": "A"
457
+
}
458
+
],
459
+
"title": "Cache Sizes",
460
+
"transparent": true,
461
+
"type": "stat"
462
+
}
463
+
],
464
+
"preload": false,
465
+
"schemaVersion": 42,
466
+
"tags": [],
467
+
"templating": {
468
+
"list": [
469
+
{
470
+
"current": {
471
+
"text": "panda",
472
+
"value": "panda"
473
+
},
474
+
"definition": "label_values(atkafka_handled_events,instance)",
475
+
"description": "",
476
+
"label": "Instance",
477
+
"name": "instance",
478
+
"options": [],
479
+
"query": {
480
+
"qryType": 1,
481
+
"query": "label_values(atkafka_handled_events,instance)",
482
+
"refId": "PrometheusVariableQueryEditor-VariableQuery"
483
+
},
484
+
"refresh": 1,
485
+
"regex": "",
486
+
"type": "query"
487
+
}
488
+
]
489
+
},
490
+
"time": {
491
+
"from": "now-3h",
492
+
"to": "now"
493
+
},
494
+
"timepicker": {},
495
+
"timezone": "browser",
496
+
"title": "ATKafka",
497
+
"uid": "adn8c6k",
498
+
"version": 17
499
+
}
+179
-66
cmd/atkafka/main.go
+179
-66
cmd/atkafka/main.go
···
18
Flags: []cli.Flag{
19
telemetry.CLIFlagDebug,
20
telemetry.CLIFlagMetricsListenAddress,
21
-
&cli.StringFlag{
22
-
Name: "relay-host",
23
-
Usage: "Websocket host to subscribe to for events",
24
-
Value: "wss://bsky.network",
25
-
EnvVars: []string{"ATKAFKA_RELAY_HOST"},
26
-
},
27
-
&cli.StringFlag{
28
-
Name: "plc-host",
29
-
Usage: "The host of the PLC directory you want to use for event metadata",
30
-
EnvVars: []string{"ATKAFKA_PLC_HOST"},
31
-
},
32
-
&cli.StringFlag{
33
-
Name: "api-host",
34
-
Usage: "The API host for making XRPC calls. Recommended to use https://public.api.bsky.app",
35
-
EnvVars: []string{"ATKAFKA_API_HOST"},
36
-
},
37
&cli.StringSliceFlag{
38
Name: "bootstrap-servers",
39
Usage: "List of Kafka bootstrap servers",
···
46
EnvVars: []string{"ATKAFKA_OUTPUT_TOPIC"},
47
Required: true,
48
},
49
-
&cli.BoolFlag{
50
-
Name: "osprey-compatible",
51
-
Usage: "Whether or not events should be formulated in an Osprey-compatible format",
52
-
EnvVars: []string{"ATKAFKA_OSPREY_COMPATIBLE"},
53
-
Value: false,
54
},
55
-
&cli.StringSliceFlag{
56
-
Name: "watched-services",
57
-
Usage: "A list of ATProto services inside a user's DID document that you want to watch. Wildcards like *.bsky.network are allowed.",
58
-
EnvVars: []string{"ATKAFKA_WATCHED_SERVICES"},
59
-
},
60
-
&cli.StringSliceFlag{
61
-
Name: "ignored-services",
62
-
Usage: "A list of ATProto services inside a user's DID document that you want to ignore. Wildcards like *.bsky.network are allowed.",
63
-
EnvVars: []string{"ATKAFKA_IGNORED_SERVICES"},
64
-
},
65
-
&cli.StringSliceFlag{
66
-
Name: "watched-collections",
67
-
Usage: "A list of collections that you want to watch. Wildcards like *.bsky.app are allowed.",
68
-
EnvVars: []string{"ATKAFKA_WATCHED_COLLECTIONS"},
69
-
},
70
-
&cli.StringSliceFlag{
71
-
Name: "ignored-collections",
72
-
Usage: "A list of collections that you want to ignore. Wildcards like *.bsky.app are allowed.",
73
-
EnvVars: []string{"ATKAFKA_IGNORED_COLLECTIONS"},
74
-
},
75
-
},
76
-
Action: func(cmd *cli.Context) error {
77
-
ctx := context.Background()
78
79
-
telemetry.StartMetrics(cmd)
80
-
logger := telemetry.StartLogger(cmd)
81
82
-
s, err := atkafka.NewServer(&atkafka.ServerArgs{
83
-
RelayHost: cmd.String("relay-host"),
84
-
PlcHost: cmd.String("plc-host"),
85
-
ApiHost: cmd.String("api-host"),
86
-
BootstrapServers: cmd.StringSlice("bootstrap-servers"),
87
-
OutputTopic: cmd.String("output-topic"),
88
-
OspreyCompat: cmd.Bool("osprey-compatible"),
89
-
WatchedServices: cmd.StringSlice("watched-services"),
90
-
IgnoredServices: cmd.StringSlice("ignored-services"),
91
-
WatchedCollections: cmd.StringSlice("watched-collections"),
92
-
IgnoredCollections: cmd.StringSlice("ignored-collections"),
93
-
Logger: logger,
94
-
})
95
-
if err != nil {
96
-
return fmt.Errorf("failed to create new server: %w", err)
97
-
}
98
99
-
if err := s.Run(ctx); err != nil {
100
-
return fmt.Errorf("error running server: %w", err)
101
-
}
102
103
-
return nil
104
},
105
}
106
···
18
Flags: []cli.Flag{
19
telemetry.CLIFlagDebug,
20
telemetry.CLIFlagMetricsListenAddress,
21
&cli.StringSliceFlag{
22
Name: "bootstrap-servers",
23
Usage: "List of Kafka bootstrap servers",
···
30
EnvVars: []string{"ATKAFKA_OUTPUT_TOPIC"},
31
Required: true,
32
},
33
+
},
34
+
Commands: cli.Commands{
35
+
&cli.Command{
36
+
Name: "firehose",
37
+
Flags: []cli.Flag{
38
+
&cli.StringFlag{
39
+
Name: "relay-host",
40
+
Usage: "Websocket host to subscribe to for events",
41
+
Value: "wss://bsky.network",
42
+
EnvVars: []string{"ATKAFKA_RELAY_HOST"},
43
+
},
44
+
&cli.StringFlag{
45
+
Name: "plc-host",
46
+
Usage: "The host of the PLC directory you want to use for event metadata",
47
+
EnvVars: []string{"ATKAFKA_PLC_HOST"},
48
+
},
49
+
&cli.StringFlag{
50
+
Name: "api-host",
51
+
Usage: "The API host for making XRPC calls. Recommended to use https://public.api.bsky.app",
52
+
EnvVars: []string{"ATKAFKA_API_HOST"},
53
+
},
54
+
&cli.IntFlag{
55
+
Name: "tap-workers",
56
+
Usage: "Number of workers to process events from Tap",
57
+
Value: 10,
58
+
EnvVars: []string{"ATKAFKA_TAP_WORKERS"},
59
+
},
60
+
&cli.BoolFlag{
61
+
Name: "osprey-compatible",
62
+
Usage: "Whether or not events should be formulated in an Osprey-compatible format",
63
+
EnvVars: []string{"ATKAFKA_OSPREY_COMPATIBLE"},
64
+
Value: false,
65
+
},
66
+
&cli.StringSliceFlag{
67
+
Name: "watched-services",
68
+
Usage: "A list of ATProto services inside a user's DID document that you want to watch. Wildcards like *.bsky.network are allowed.",
69
+
EnvVars: []string{"ATKAFKA_WATCHED_SERVICES"},
70
+
},
71
+
&cli.StringSliceFlag{
72
+
Name: "ignored-services",
73
+
Usage: "A list of ATProto services inside a user's DID document that you want to ignore. Wildcards like *.bsky.network are allowed.",
74
+
EnvVars: []string{"ATKAFKA_IGNORED_SERVICES"},
75
+
},
76
+
&cli.StringSliceFlag{
77
+
Name: "watched-collections",
78
+
Usage: "A list of collections that you want to watch. Wildcards like *.bsky.app are allowed.",
79
+
EnvVars: []string{"ATKAFKA_WATCHED_COLLECTIONS"},
80
+
},
81
+
&cli.StringSliceFlag{
82
+
Name: "ignored-collections",
83
+
Usage: "A list of collections that you want to ignore. Wildcards like *.bsky.app are allowed.",
84
+
EnvVars: []string{"ATKAFKA_IGNORED_COLLECTIONS"},
85
+
},
86
+
},
87
+
Subcommands: cli.Commands{
88
+
&cli.Command{
89
+
Name: "relay",
90
+
Action: func(cmd *cli.Context) error {
91
+
ctx := context.Background()
92
+
93
+
telemetry.StartMetrics(cmd)
94
+
logger := telemetry.StartLogger(cmd)
95
+
96
+
s, err := atkafka.NewServer(&atkafka.ServerArgs{
97
+
RelayHost: cmd.String("relay-host"),
98
+
PlcHost: cmd.String("plc-host"),
99
+
ApiHost: cmd.String("api-host"),
100
+
BootstrapServers: cmd.StringSlice("bootstrap-servers"),
101
+
OutputTopic: cmd.String("output-topic"),
102
+
OspreyCompat: cmd.Bool("osprey-compatible"),
103
+
WatchedServices: cmd.StringSlice("watched-services"),
104
+
IgnoredServices: cmd.StringSlice("ignored-services"),
105
+
WatchedCollections: cmd.StringSlice("watched-collections"),
106
+
IgnoredCollections: cmd.StringSlice("ignored-collections"),
107
+
Logger: logger,
108
+
})
109
+
if err != nil {
110
+
return fmt.Errorf("failed to create new server: %w", err)
111
+
}
112
+
113
+
if err := s.Run(ctx); err != nil {
114
+
return fmt.Errorf("error running server: %w", err)
115
+
}
116
+
117
+
return nil
118
+
},
119
+
},
120
+
&cli.Command{
121
+
Name: "tap",
122
+
Flags: []cli.Flag{
123
+
&cli.StringFlag{
124
+
Name: "tap-host",
125
+
Usage: "Tap host to subscribe to for events",
126
+
Value: "ws://localhost:2480",
127
+
EnvVars: []string{"ATKAFKA_TAP_HOST"},
128
+
},
129
+
&cli.BoolFlag{
130
+
Name: "disable-acks",
131
+
Usage: "Set to `true` to disable sending of event acks to Tap. May result in the loss of events.",
132
+
EnvVars: []string{"ATKAFKA_DISABLE_ACKS"},
133
+
},
134
+
},
135
+
Action: func(cmd *cli.Context) error {
136
+
137
+
ctx := context.Background()
138
+
139
+
telemetry.StartMetrics(cmd)
140
+
logger := telemetry.StartLogger(cmd)
141
+
142
+
s, err := atkafka.NewServer(&atkafka.ServerArgs{
143
+
TapHost: cmd.String("tap-host"),
144
+
DisableAcks: cmd.Bool("disable-acks"),
145
+
BootstrapServers: cmd.StringSlice("bootstrap-servers"),
146
+
OutputTopic: cmd.String("output-topic"),
147
+
WatchedCollections: cmd.StringSlice("watched-collections"),
148
+
IgnoredCollections: cmd.StringSlice("ignored-collections"),
149
+
Logger: logger,
150
+
})
151
+
if err != nil {
152
+
return fmt.Errorf("failed to create new server: %w", err)
153
+
}
154
+
155
+
if err := s.RunTapMode(ctx); err != nil {
156
+
return fmt.Errorf("error running server: %w", err)
157
+
}
158
+
159
+
return nil
160
+
},
161
+
},
162
+
},
163
},
164
+
&cli.Command{
165
+
Name: "ozone-events",
166
+
Flags: []cli.Flag{
167
+
&cli.StringFlag{
168
+
Name: "pds-host",
169
+
EnvVars: []string{"ATKAFKA_OZONE_PDS_HOST"},
170
+
Required: true,
171
+
},
172
+
&cli.StringFlag{
173
+
Name: "identifier",
174
+
EnvVars: []string{"ATKAFKA_OZONE_IDENTIFIER"},
175
+
Required: true,
176
+
},
177
+
&cli.StringFlag{
178
+
Name: "password",
179
+
EnvVars: []string{"ATKAFKA_OZONE_PASSWORD"},
180
+
Required: true,
181
+
},
182
+
&cli.StringFlag{
183
+
Name: "labeler-did",
184
+
EnvVars: []string{"ATKAFKA_OZONE_LABELER_DID"},
185
+
Required: true,
186
+
},
187
+
},
188
+
Action: func(cmd *cli.Context) error {
189
+
ctx := context.Background()
190
191
+
telemetry.StartMetrics(cmd)
192
+
logger := telemetry.StartLogger(cmd)
193
194
+
s, err := atkafka.NewOzoneServer(&atkafka.OzoneServerArgs{
195
+
Logger: logger,
196
+
BootstrapServers: cmd.StringSlice("bootstrap-servers"),
197
+
OutputTopic: cmd.String("output-topic"),
198
+
OzoneClientArgs: &atkafka.OzoneClientArgs{
199
+
OzonePdsHost: cmd.String("pds-host"),
200
+
OzoneIdentifier: cmd.String("identifier"),
201
+
OzonePassword: cmd.String("password"),
202
+
OzoneLabelerDid: cmd.String("labeler-did"),
203
+
Logger: logger,
204
+
},
205
+
})
206
+
if err != nil {
207
+
return fmt.Errorf("failed to create new ozone server: %w", err)
208
+
}
209
210
+
if err := s.Run(ctx); err != nil {
211
+
return fmt.Errorf("error running ozone server: %w", err)
212
+
}
213
214
+
return nil
215
+
},
216
+
},
217
},
218
}
219
+39
docker-compose.ozone.yml
+39
docker-compose.ozone.yml
···
···
1
+
services:
2
+
zookeeper:
3
+
image: confluentinc/cp-zookeeper:7.6.0
4
+
hostname: zookeeper
5
+
container_name: zookeeper
6
+
ports:
7
+
- "2181:2181"
8
+
environment:
9
+
ZOOKEEPER_CLIENT_PORT: 2181
10
+
ZOOKEEPER_TICK_TIME: 2000
11
+
volumes:
12
+
- zookeeper-data:/var/lib/zookeeper/data
13
+
- zookeeper-logs:/var/lib/zookeeper/log
14
+
15
+
atkafka:
16
+
build:
17
+
context: .
18
+
dockerfile: Dockerfile
19
+
container_name: atkafka
20
+
command: ["ozone-events"]
21
+
ports:
22
+
- "2112:2112"
23
+
environment:
24
+
ATKAFKA_RELAY_HOST: "wss://bsky.network"
25
+
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
26
+
ATKAFKA_OUTPUT_TOPIC: "atproto-events"
27
+
ATKAFKA_OSPREY_COMPATIBLE: "false"
28
+
ATKAFKA_PLC_HOST: "https://plc.directory"
29
+
ATKAFKA_API_HOST: "https://public.api.bsky.app"
30
+
volumes:
31
+
- atkafka-data:/data
32
+
working_dir: /data
33
+
restart: unless-stopped
34
+
35
+
volumes:
36
+
zookeeper-data:
37
+
zookeeper-logs:
38
+
kafka-data:
39
+
atkafka-data:
+91
docker-compose.tap.yml
+91
docker-compose.tap.yml
···
···
1
+
services:
2
+
zookeeper:
3
+
image: confluentinc/cp-zookeeper:7.6.0
4
+
hostname: zookeeper
5
+
container_name: zookeeper
6
+
ports:
7
+
- "2181:2181"
8
+
environment:
9
+
ZOOKEEPER_CLIENT_PORT: 2181
10
+
ZOOKEEPER_TICK_TIME: 2000
11
+
volumes:
12
+
- zookeeper-data:/var/lib/zookeeper/data
13
+
- zookeeper-logs:/var/lib/zookeeper/log
14
+
15
+
kafka:
16
+
image: confluentinc/cp-kafka:7.6.0
17
+
hostname: kafka
18
+
container_name: kafka
19
+
depends_on:
20
+
- zookeeper
21
+
ports:
22
+
- "9092:9092"
23
+
- "9101:9101"
24
+
environment:
25
+
KAFKA_BROKER_ID: 1
26
+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
27
+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
28
+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
29
+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
30
+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
31
+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
32
+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
33
+
KAFKA_JMX_PORT: 9101
34
+
KAFKA_JMX_HOSTNAME: localhost
35
+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
36
+
healthcheck:
37
+
test:
38
+
[
39
+
"CMD",
40
+
"bash",
41
+
"-c",
42
+
"kafka-topics --bootstrap-server kafka:29092 --list",
43
+
]
44
+
volumes:
45
+
- kafka-data:/var/lib/kafka/data
46
+
47
+
48
+
tap:
49
+
image: ghcr.io/bluesky-social/indigo/tap:latest
50
+
hostname: tap
51
+
container_name: tap
52
+
depends_on:
53
+
kafka:
54
+
condition: service_healthy
55
+
ports:
56
+
- "2480:2480"
57
+
- "6010:6010"
58
+
environment:
59
+
TAP_BIND: ":2480"
60
+
TAP_FULL_NETWORK: true
61
+
TAP_DISABLE_ACKS: false
62
+
TAP_COLLECTION_FILTERS: "app.bsky.graph.follow"
63
+
TAP_METRICS_LISTEN: ":6010"
64
+
volumes:
65
+
- tap-data:/data
66
+
67
+
atkafka:
68
+
build:
69
+
context: .
70
+
dockerfile: Dockerfile
71
+
container_name: atkafka
72
+
depends_on:
73
+
- kafka
74
+
- tap
75
+
ports:
76
+
# metrics port
77
+
- "6009:6009"
78
+
command: ["firehose", "tap"]
79
+
environment:
80
+
ATKAFKA_TAP_HOST: "ws://tap:2480"
81
+
ATKAFKA_DISABLE_ACKS: false
82
+
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
83
+
ATKAFKA_OUTPUT_TOPIC: "tap-events"
84
+
85
+
restart: unless-stopped
86
+
87
+
volumes:
88
+
zookeeper-data:
89
+
zookeeper-logs:
90
+
kafka-data:
91
+
tap-data:
+2
-6
docker-compose.yml
+2
-6
docker-compose.yml
···
41
context: .
42
dockerfile: Dockerfile
43
container_name: atkafka
44
-
depends_on:
45
-
- kafka
46
ports:
47
- "2112:2112"
48
environment:
···
51
ATKAFKA_OUTPUT_TOPIC: "atproto-events"
52
ATKAFKA_OSPREY_COMPATIBLE: "false"
53
ATKAFKA_PLC_HOST: "https://plc.directory"
54
-
55
-
ATKAFKA_WATCHED_SERVICES: "*.bsky.network"
56
-
57
-
ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*"
58
restart: unless-stopped
59
60
volumes:
···
41
context: .
42
dockerfile: Dockerfile
43
container_name: atkafka
44
+
command: ["firehose", "relay"]
45
ports:
46
- "2112:2112"
47
environment:
···
50
ATKAFKA_OUTPUT_TOPIC: "atproto-events"
51
ATKAFKA_OSPREY_COMPATIBLE: "false"
52
ATKAFKA_PLC_HOST: "https://plc.directory"
53
+
ATKAFKA_API_HOST: "https://public.api.bsky.app"
54
restart: unless-stopped
55
56
volumes:
+2
-1
go.mod
+2
-1
go.mod
···
3
go 1.25.4
4
5
require (
6
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934
7
github.com/bluesky-social/indigo v0.0.0-20251125184450-35c1e15d2e5f
8
github.com/gorilla/websocket v1.5.3
···
11
github.com/prometheus/client_golang v1.23.2
12
github.com/twmb/franz-go v1.19.5
13
github.com/urfave/cli/v2 v2.25.7
14
golang.org/x/time v0.6.0
15
)
16
···
95
go.yaml.in/yaml/v2 v2.4.2 // indirect
96
golang.org/x/crypto v0.40.0 // indirect
97
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
98
-
golang.org/x/sync v0.16.0 // indirect
99
golang.org/x/sys v0.35.0 // indirect
100
golang.org/x/text v0.28.0 // indirect
101
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
···
3
go 1.25.4
4
5
require (
6
+
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
7
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934
8
github.com/bluesky-social/indigo v0.0.0-20251125184450-35c1e15d2e5f
9
github.com/gorilla/websocket v1.5.3
···
12
github.com/prometheus/client_golang v1.23.2
13
github.com/twmb/franz-go v1.19.5
14
github.com/urfave/cli/v2 v2.25.7
15
+
golang.org/x/sync v0.16.0
16
golang.org/x/time v0.6.0
17
)
18
···
97
go.yaml.in/yaml/v2 v2.4.2 // indirect
98
golang.org/x/crypto v0.40.0 // indirect
99
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
100
golang.org/x/sys v0.35.0 // indirect
101
golang.org/x/text v0.28.0 // indirect
102
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
+5
go.sum
+5
go.sum
···
3
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4=
4
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM=
5
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
6
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
7
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
8
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
···
200
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
201
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
202
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
203
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
204
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
205
github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
···
249
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
250
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
251
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
252
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
253
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
254
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
255
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
256
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
257
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
258
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
259
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
260
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
···
3
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4=
4
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM=
5
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
6
+
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA=
7
+
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
8
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
9
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
10
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
···
202
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
203
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
204
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
205
+
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
206
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
207
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
208
github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
···
252
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
253
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
254
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
255
+
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
256
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
257
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
258
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
259
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
260
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
261
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
262
+
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
263
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
264
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
265
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=